Import NetworkManager

This commit is contained in:
Sotr
2018-06-06 21:40:25 +08:00
parent dbee403ba5
commit 93b657436d
7 changed files with 435 additions and 73 deletions

View File

@@ -2,6 +2,8 @@ package io.akarin.api;
import java.lang.reflect.Method;
import java.util.Queue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.logging.log4j.LogManager;
@@ -34,6 +36,11 @@ public abstract class Akari {
*/
public static final Queue<Runnable> callbackQueue = Queues.newConcurrentLinkedQueue();
/**
* A common tick pool
*/
public static final ExecutorCompletionService<Void> STAGE_TICK = new ExecutorCompletionService<Void>(Executors.newFixedThreadPool(1, Akari.STAGE_FACTORY));
/*
* Timings
*/

View File

@@ -1,9 +0,0 @@
package io.akarin.api;
import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class CheckedConcurrentLinkedQueue<E> extends ConcurrentLinkedQueue<E> implements Queue<E>, Serializable {
}

View File

@@ -1,10 +1,7 @@
package io.akarin.server.mixin.core;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.bukkit.craftbukkit.CraftServer;
import org.bukkit.craftbukkit.chunkio.ChunkIOExecutor;
@@ -15,11 +12,7 @@ import org.spongepowered.asm.mixin.Mutable;
import org.spongepowered.asm.mixin.Overwrite;
import org.spongepowered.asm.mixin.Shadow;
import com.google.common.collect.Queues;
import co.aikar.timings.MinecraftTimings;
import co.aikar.timings.Timing;
import co.aikar.timings.Timings;
import io.akarin.api.Akari;
import net.minecraft.server.CrashReport;
import net.minecraft.server.CustomFunctionData;
@@ -63,8 +56,6 @@ public class MixinMinecraftServer {
@Shadow public ServerConnection an() { return null; }
@Shadow public CustomFunctionData aL() { return null; }
private final ExecutorCompletionService<Void> STAGE_ENTITY_TICK = new ExecutorCompletionService<Void>(Executors.newFixedThreadPool(1, Akari.STAGE_FACTORY));
private void tickEntities(WorldServer world) {
try {
world.tickEntities();
@@ -123,7 +114,7 @@ public class MixinMinecraftServer {
TileEntityHopper.skipHopperEvents = entityWorld.paperConfig.disableHopperMoveEvents || InventoryMoveItemEvent.getHandlerList().getRegisteredListeners().length == 0;
Akari.silentTiming = true;
STAGE_ENTITY_TICK.submit(() -> tickEntities(entityWorld), null);
Akari.STAGE_TICK.submit(() -> tickEntities(entityWorld), null);
try {
mainWorld.timings.doTick.startTiming();
@@ -141,7 +132,7 @@ public class MixinMinecraftServer {
}
entityWorld.timings.tickEntities.startTiming();
STAGE_ENTITY_TICK.take();
Akari.STAGE_TICK.take();
entityWorld.timings.tickEntities.stopTiming();
entityWorld.getTracker().updatePlayers();

View File

@@ -0,0 +1,9 @@
package io.akarin.server.mixin.core;
import org.spongepowered.asm.mixin.Mixin;
import net.minecraft.server.NetworkManager;
@Mixin(value = NetworkManager.class, remap = false)
public class MixinNetworkManager {
}

View File

@@ -5,6 +5,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -15,8 +16,9 @@ import org.spongepowered.asm.mixin.Mutable;
import org.spongepowered.asm.mixin.Overwrite;
import org.spongepowered.asm.mixin.Shadow;
import com.google.common.collect.Lists;
import io.akarin.api.Akari;
import io.akarin.api.LocalAddress;
import io.akarin.api.WrappedCollections;
import io.akarin.server.core.AkarinGlobalConfig;
import io.akarin.server.core.ChannelAdapter;
import io.netty.bootstrap.ServerBootstrap;
@@ -47,21 +49,9 @@ public class NonblockingServerConnection {
*/
@Shadow @Mutable @Final private List<NetworkManager> h;
private final List<NetworkManager> pending = WrappedCollections.wrappedList(Collections.synchronizedList(Lists.newLinkedList()));
@Overwrite
private void addPending() {} // just keep compatibility
/**
* Removes all pending endpoints from global NetworkManager list
*/
private void removePending() {
synchronized (pending) {
h.removeAll(pending);
pending.clear();
}
}
@Shadow @Final private MinecraftServer f;
/**
@@ -72,8 +62,6 @@ public class NonblockingServerConnection {
registerChannels(Lists.newArrayList(LocalAddress.create(address, port)));
}
private boolean needDeployList = true;
public void registerChannels(Collection<LocalAddress> data) throws IOException {
Class<? extends ServerChannel> channelClass;
EventLoopGroup loopGroup;
@@ -88,12 +76,6 @@ public class NonblockingServerConnection {
logger.info("Using nio channel type");
}
// Since we cannot overwrite the initializer, here is the best chance to handle it
if (needDeployList) {
h = WrappedCollections.wrappedList(Lists.newCopyOnWriteArrayList());
needDeployList = false;
}
ServerBootstrap bootstrap = new ServerBootstrap().channel(channelClass).childHandler(ChannelAdapter.create(h)).group(loopGroup);
synchronized (g) {
data.addAll(Lists.transform(AkarinGlobalConfig.extraAddress, s -> {
@@ -125,21 +107,7 @@ public class NonblockingServerConnection {
}
}
/**
* Will try to process the packets received by each NetworkManager, gracefully manage processing failures and cleans up dead connections (tick)
*/
@Overwrite
public void c() {
// Spigot - This prevents players from 'gaming' the server, and strategically relogging to increase their position in the tick order
if (SpigotConfig.playerShuffle > 0 && MinecraftServer.currentTick % SpigotConfig.playerShuffle == 0) {
Collections.shuffle(h);
}
boolean needRemoval = false;
for (NetworkManager manager : h) {
if (manager.h()) continue; // PAIL: NetworkManager::hasNoChannel
if (manager.isConnected()) {
public void processPackets(NetworkManager manager) {
try {
manager.a(); // PAIL: NetworkManager::processReceivedPackets
} catch (Exception ex) {
@@ -154,18 +122,41 @@ public class NonblockingServerConnection {
}, new GenericFutureListener[0]);
manager.stopReading();
}
}
/**
* Will try to process the packets received by each NetworkManager, gracefully manage processing failures and cleans up dead connections (tick)
*/
@Overwrite
public void c() throws InterruptedException {
synchronized (h) {
// Spigot - This prevents players from 'gaming' the server, and strategically relogging to increase their position in the tick order
if (SpigotConfig.playerShuffle > 0 && MinecraftServer.currentTick % SpigotConfig.playerShuffle == 0) {
Collections.shuffle(h);
}
int submitted = 0;
Iterator<NetworkManager> it = h.iterator();
while (it.hasNext()) {
NetworkManager manager = it.next();
if (manager.h()) continue; // PAIL: NetworkManager::hasNoChannel
if (manager.isConnected()) {
Akari.STAGE_TICK.submit(() -> processPackets(manager), null);
submitted++;
} else {
// Spigot - Fix a race condition where a NetworkManager could be unregistered just before connection.
if (manager.preparing) continue;
needRemoval = true;
pending.add(manager);
it.remove();
manager.handleDisconnection();
}
}
if (needRemoval) removePending();
for (int i = 0; i < submitted; i++) {
Akari.STAGE_TICK.take();
}
}
}
}

View File

@@ -0,0 +1,372 @@
package net.minecraft.server;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.timeout.TimeoutException;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.crypto.SecretKey;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
private static final Logger g = LogManager.getLogger();
public static final Marker a = MarkerManager.getMarker("NETWORK");
public static final Marker b = MarkerManager.getMarker("NETWORK_PACKETS", NetworkManager.a);
public static final AttributeKey<EnumProtocol> c = AttributeKey.valueOf("protocol");
public static final LazyInitVar<NioEventLoopGroup> d = new LazyInitVar() {
protected NioEventLoopGroup a() {
return new NioEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Client IO #%d").setDaemon(true).build());
}
protected Object init() {
return this.a();
}
};
public static final LazyInitVar<EpollEventLoopGroup> e = new LazyInitVar() {
protected EpollEventLoopGroup a() {
return new EpollEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Epoll Client IO #%d").setDaemon(true).build());
}
protected Object init() {
return this.a();
}
};
public static final LazyInitVar<LocalEventLoopGroup> f = new LazyInitVar() {
protected LocalEventLoopGroup a() {
return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build());
}
protected Object init() {
return this.a();
}
};
private final EnumProtocolDirection h;
private final Queue<NetworkManager.QueuedPacket> i = Queues.newConcurrentLinkedQueue(); private final Queue<NetworkManager.QueuedPacket> getPacketQueue() { return this.i; } // Paper - Anti-Xray - OBFHELPER
private final ReentrantReadWriteLock j = new ReentrantReadWriteLock();
public Channel channel;
// Spigot Start // PAIL
public SocketAddress l;
public java.util.UUID spoofedUUID;
public com.mojang.authlib.properties.Property[] spoofedProfile;
public boolean preparing = true;
// Spigot End
private PacketListener m;
private IChatBaseComponent n;
private boolean o;
private boolean p;
// Paper start - NetworkClient implementation
public int protocolVersion;
public java.net.InetSocketAddress virtualHost;
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
// Paper end
public NetworkManager(EnumProtocolDirection enumprotocoldirection) {
this.h = enumprotocoldirection;
}
public void channelActive(ChannelHandlerContext channelhandlercontext) throws Exception {
super.channelActive(channelhandlercontext);
this.channel = channelhandlercontext.channel();
this.l = this.channel.remoteAddress();
// Spigot Start
this.preparing = false;
// Spigot End
try {
this.setProtocol(EnumProtocol.HANDSHAKING);
} catch (Throwable throwable) {
NetworkManager.g.fatal(throwable);
}
}
public void setProtocol(EnumProtocol enumprotocol) {
this.channel.attr(NetworkManager.c).set(enumprotocol);
this.channel.config().setAutoRead(true);
NetworkManager.g.debug("Enabled auto read");
}
public void channelInactive(ChannelHandlerContext channelhandlercontext) throws Exception {
this.close(new ChatMessage("disconnect.endOfStream", new Object[0]));
}
public void exceptionCaught(ChannelHandlerContext channelhandlercontext, Throwable throwable) throws Exception {
ChatMessage chatmessage;
if (throwable instanceof TimeoutException) {
chatmessage = new ChatMessage("disconnect.timeout", new Object[0]);
} else {
chatmessage = new ChatMessage("disconnect.genericReason", new Object[] { "Internal Exception: " + throwable});
}
NetworkManager.g.debug(chatmessage.toPlainText(), throwable);
this.close(chatmessage);
if (MinecraftServer.getServer().isDebugging()) throwable.printStackTrace(); // Spigot
}
protected void a(ChannelHandlerContext channelhandlercontext, Packet<?> packet) throws Exception {
if (this.channel.isOpen()) {
try {
((Packet) packet).a(this.m); // CraftBukkit - decompile error
} catch (CancelledPacketHandleException cancelledpackethandleexception) {
;
}
}
}
public void setPacketListener(PacketListener packetlistener) {
Validate.notNull(packetlistener, "packetListener", new Object[0]);
NetworkManager.g.debug("Set listener of {} to {}", this, packetlistener);
this.m = packetlistener;
}
public void sendPacket(Packet<?> packet) {
if (this.isConnected() && this.trySendQueue() && !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady())) { // Paper - Async-Anti-Xray - Add chunk packets which are not ready or all packets if the queue contains chunk packets which are not ready to the queue and send the packets later in the right order
//this.m(); // Paper - Async-Anti-Xray - Move to if statement (this.trySendQueue())
this.a(packet, (GenericFutureListener[]) null);
} else {
this.j.writeLock().lock();
try {
this.i.add(new NetworkManager.QueuedPacket(packet, new GenericFutureListener[0]));
} finally {
this.j.writeLock().unlock();
}
}
}
public void sendPacket(Packet<?> packet, GenericFutureListener<? extends Future<? super Void>> genericfuturelistener, GenericFutureListener<? extends Future<? super Void>>... agenericfuturelistener) {
if (this.isConnected() && this.trySendQueue() && !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady())) { // Paper - Async-Anti-Xray - Add chunk packets which are not ready or all packets if the queue contains chunk packets which are not ready to the queue and send the packets later in the right order
//this.m(); // Paper - Async-Anti-Xray - Move to if statement (this.trySendQueue())
this.a(packet, (GenericFutureListener[]) ArrayUtils.add(agenericfuturelistener, 0, genericfuturelistener));
} else {
this.j.writeLock().lock();
try {
this.i.add(new NetworkManager.QueuedPacket(packet, (GenericFutureListener[]) ArrayUtils.add(agenericfuturelistener, 0, genericfuturelistener)));
} finally {
this.j.writeLock().unlock();
}
}
}
private void dispatchPacket(final Packet<?> packet, @Nullable final GenericFutureListener<? extends Future<? super Void>>[] genericFutureListeners) { this.a(packet, genericFutureListeners); } // Paper - Anti-Xray - OBFHELPER
private void a(final Packet<?> packet, @Nullable final GenericFutureListener<? extends Future<? super Void>>[] agenericfuturelistener) {
final EnumProtocol enumprotocol = EnumProtocol.a(packet);
final EnumProtocol enumprotocol1 = (EnumProtocol) this.channel.attr(NetworkManager.c).get();
if (enumprotocol1 != enumprotocol) {
NetworkManager.g.debug("Disabled auto read");
this.channel.config().setAutoRead(false);
}
if (this.channel.eventLoop().inEventLoop()) {
if (enumprotocol != enumprotocol1) {
this.setProtocol(enumprotocol);
}
ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
if (agenericfuturelistener != null) {
channelfuture.addListeners(agenericfuturelistener);
}
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} else {
this.channel.eventLoop().execute(new Runnable() {
public void run() {
if (enumprotocol != enumprotocol1) {
NetworkManager.this.setProtocol(enumprotocol);
}
ChannelFuture channelfuture = NetworkManager.this.channel.writeAndFlush(packet);
if (agenericfuturelistener != null) {
channelfuture.addListeners(agenericfuturelistener);
}
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}
});
}
}
// Paper start - Async-Anti-Xray - Stop dispatching further packets and return false if the peeked packet is a chunk packet which is not ready
private boolean trySendQueue() { return this.m(); } // OBFHELPER
private boolean m() { // void -> boolean
if (this.channel != null && this.channel.isOpen()) {
if (this.i.isEmpty()) { // return if the packet queue is empty so that the write lock by Anti-Xray doesn't affect the vanilla performance at all
return true;
}
this.j.writeLock().lock(); // readLock -> writeLock (because of race condition between peek and poll)
try {
while (!this.i.isEmpty()) {
NetworkManager.QueuedPacket networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.getPacketQueue().peek(); // poll -> peek
if (networkmanager_queuedpacket != null) { // Fix NPE (Spigot bug caused by handleDisconnection())
if (networkmanager_queuedpacket.getPacket() instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) networkmanager_queuedpacket.getPacket()).isReady()) { // Check if the peeked packet is a chunk packet which is not ready
return false; // Return false if the peeked packet is a chunk packet which is not ready
} else {
this.getPacketQueue().poll(); // poll here
this.dispatchPacket(networkmanager_queuedpacket.getPacket(), networkmanager_queuedpacket.getGenericFutureListeners()); // dispatch the packet
}
}
}
} finally {
this.j.writeLock().unlock(); // readLock -> writeLock (because of race condition between peek and poll)
}
}
return true; // Return true if all packets were dispatched
}
// Paper end
public void a() {
this.m();
if (this.m instanceof ITickable) {
((ITickable) this.m).e();
}
if (this.channel != null) {
if (enableExplicitFlush) this.channel.eventLoop().execute(() -> this.channel.flush()); // Paper - we don't need to explicit flush here, but allow opt in incase issues are found to a better version
}
}
public SocketAddress getSocketAddress() {
return this.l;
}
public void close(IChatBaseComponent ichatbasecomponent) {
// Spigot Start
this.preparing = false;
// Spigot End
if (this.channel.isOpen()) {
this.channel.close(); // We can't wait as this may be called from an event loop.
this.n = ichatbasecomponent;
}
}
public boolean isLocal() {
return this.channel instanceof LocalChannel || this.channel instanceof LocalServerChannel;
}
public void a(SecretKey secretkey) {
this.o = true;
this.channel.pipeline().addBefore("splitter", "decrypt", new PacketDecrypter(MinecraftEncryption.a(2, secretkey)));
this.channel.pipeline().addBefore("prepender", "encrypt", new PacketEncrypter(MinecraftEncryption.a(1, secretkey)));
}
public boolean isConnected() {
return this.channel != null && this.channel.isOpen();
}
public boolean h() {
return this.channel == null;
}
public PacketListener i() {
return this.m;
}
public IChatBaseComponent j() {
return this.n;
}
public void stopReading() {
this.channel.config().setAutoRead(false);
}
public void setCompressionLevel(int i) {
if (i >= 0) {
if (this.channel.pipeline().get("decompress") instanceof PacketDecompressor) {
((PacketDecompressor) this.channel.pipeline().get("decompress")).a(i);
} else {
this.channel.pipeline().addBefore("decoder", "decompress", new PacketDecompressor(i));
}
if (this.channel.pipeline().get("compress") instanceof PacketCompressor) {
((PacketCompressor) this.channel.pipeline().get("compress")).a(i);
} else {
this.channel.pipeline().addBefore("encoder", "compress", new PacketCompressor(i));
}
} else {
if (this.channel.pipeline().get("decompress") instanceof PacketDecompressor) {
this.channel.pipeline().remove("decompress");
}
if (this.channel.pipeline().get("compress") instanceof PacketCompressor) {
this.channel.pipeline().remove("compress");
}
}
}
public void handleDisconnection() {
if (this.channel != null && !this.channel.isOpen()) {
if (this.p) {
NetworkManager.g.warn("handleDisconnection() called twice");
} else {
this.p = true;
if (this.j() != null) {
this.i().a(this.j());
} else if (this.i() != null) {
this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0]));
}
this.i.clear(); // Free up packet queue.
}
}
}
protected void channelRead0(ChannelHandlerContext channelhandlercontext, Packet object) throws Exception { // CraftBukkit - fix decompile error
this.a(channelhandlercontext, (Packet) object);
}
static class QueuedPacket {
private final Packet<?> a; private final Packet<?> getPacket() { return this.a; } // Paper - Anti-Xray - OBFHELPER
private final GenericFutureListener<? extends Future<? super Void>>[] b; private final GenericFutureListener<? extends Future<? super Void>>[] getGenericFutureListeners() { return this.b; } // Paper - Anti-Xray - OBFHELPER
public QueuedPacket(Packet<?> packet, GenericFutureListener<? extends Future<? super Void>>... agenericfuturelistener) {
this.a = packet;
this.b = agenericfuturelistener;
}
}
// Spigot Start
public SocketAddress getRawAddress()
{
return this.channel.remoteAddress();
}
// Spigot End
}

View File

@@ -20,6 +20,7 @@
"MixinVersionCommand",
"MixinMinecraftServer",
"MixinChunkIOExecutor",
"MixinPlayerConnection",
"MixinTileEntityEnchantTable"
]
}