diff --git a/sources/src/main/java/io/akarin/api/LocalAddress.java b/sources/src/main/java/io/akarin/api/LocalAddress.java new file mode 100644 index 000000000..f9996394d --- /dev/null +++ b/sources/src/main/java/io/akarin/api/LocalAddress.java @@ -0,0 +1,28 @@ +package io.akarin.api; + +import java.net.InetAddress; + +import javax.annotation.concurrent.Immutable; + +@Immutable +public class LocalAddress { + private final InetAddress host; + private final int port; + + public static LocalAddress create(InetAddress localHost, int localPort) { + return new LocalAddress(localHost, localPort); + } + + public LocalAddress(InetAddress localHost, int localPort) { + host = localHost; + port = localPort; + } + + public InetAddress host() { + return host; + } + + public int port() { + return port; + } +} diff --git a/sources/src/main/java/io/akarin/api/WrappedCollections.java b/sources/src/main/java/io/akarin/api/WrappedCollections.java new file mode 100644 index 000000000..b6afa0962 --- /dev/null +++ b/sources/src/main/java/io/akarin/api/WrappedCollections.java @@ -0,0 +1,209 @@ +package io.akarin.api; + +import java.io.Serializable; +import java.io.ObjectOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Objects; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; + +public class WrappedCollections { + // Wrappers + public static Collection wrappedCollection(Collection c) { + return new WrappedCollection<>(c); + } + + /** + * @serial include + */ + static class WrappedCollection implements Collection, Serializable { + private static final long serialVersionUID = 3053995032091335093L; + + final Collection list; // Backing Collection + final Object mutex; // Object on which to synchronize + + WrappedCollection(Collection c) { + this.list = Objects.requireNonNull(c); + mutex = this; + } + + @Override + public int size() { + return list.size(); + } + @Override + public boolean isEmpty() { + return list.isEmpty(); + } + @Override + public boolean contains(Object o) { + return list.contains(o); + } + @Override + public Object[] toArray() { + return list.toArray(); + } + @Override + public T[] toArray(T[] a) { + return list.toArray(a); + } + + @Override + public Iterator iterator() { + return list.iterator(); + } + + @Override + public boolean add(E e) { + return list.add(e); + } + @Override + public boolean remove(Object o) { + return list.remove(o); + } + + @Override + public boolean containsAll(Collection coll) { + return list.containsAll(coll); + } + @Override + public boolean addAll(Collection coll) { + return list.addAll(coll); + } + @Override + public boolean removeAll(Collection coll) { + return list.removeAll(coll); + } + @Override + public boolean retainAll(Collection coll) { + return list.retainAll(coll); + } + @Override + public void clear() { + list.clear(); + } + @Override + public String toString() { + return list.toString(); + } + // Override default methods in Collection + @Override + public void forEach(Consumer consumer) { + list.forEach(consumer); + } + @Override + public boolean removeIf(Predicate filter) { + return list.removeIf(filter); + } + @Override + public Spliterator spliterator() { + return list.spliterator(); + } + @Override + public Stream stream() { + return list.stream(); + } + @Override + public Stream parallelStream() { + return list.parallelStream(); + } + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + } + } + + public static List wrappedList(List list) { + return new WrappedList<>(list); + } + + /** + * @serial include + */ + static class WrappedList extends WrappedCollection implements List { + private static final long serialVersionUID = -7754090372962971524L; + + final List list; + + WrappedList(List list) { + super(list); + this.list = list; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + return list.equals(o); + } + @Override + public int hashCode() { + return list.hashCode(); + } + + @Override + public E get(int index) { + return list.get(index); + } + @Override + public E set(int index, E element) { + return list.set(index, element); + } + @Override + public void add(int index, E element) { + list.add(index, element); + } + @Override + public E remove(int index) { + return list.remove(index); + } + + @Override + public int indexOf(Object o) { + return list.indexOf(o); + } + @Override + public int lastIndexOf(Object o) { + return list.lastIndexOf(o); + } + + @Override + public boolean addAll(int index, Collection c) { + return list.addAll(index, c); + } + + @Override + public ListIterator listIterator() { + return list.listIterator(); + } + + @Override + public ListIterator listIterator(int index) { + return list.listIterator(index); + } + + @Override + public List subList(int fromIndex, int toIndex) { + return new WrappedList<>(list.subList(fromIndex, toIndex)); + } + + @Override + public void replaceAll(UnaryOperator operator) { + list.replaceAll(operator); + } + @Override + public void sort(Comparator c) { + list.sort(c); + } + + private Object readResolve() { + return this; + } + } +} \ No newline at end of file diff --git a/sources/src/main/java/io/akarin/server/core/ChannelAdapter.java b/sources/src/main/java/io/akarin/server/core/ChannelAdapter.java new file mode 100644 index 000000000..b27c71309 --- /dev/null +++ b/sources/src/main/java/io/akarin/server/core/ChannelAdapter.java @@ -0,0 +1,49 @@ +package io.akarin.server.core; + +import java.util.List; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import net.minecraft.server.EnumProtocolDirection; +import net.minecraft.server.HandshakeListener; +import net.minecraft.server.LegacyPingHandler; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.NetworkManager; +import net.minecraft.server.PacketDecoder; +import net.minecraft.server.PacketEncoder; +import net.minecraft.server.PacketPrepender; +import net.minecraft.server.PacketSplitter; + +public class ChannelAdapter extends ChannelInitializer { + private final List managers; + + public ChannelAdapter(List list) { + managers = list; + } + + public static ChannelAdapter create(List managers) { + return new ChannelAdapter(managers); + } + + @Override + protected void initChannel(Channel channel) { + try { + channel.config().setOption(ChannelOption.TCP_NODELAY, true); + } catch (ChannelException ex) { + ; + } + channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)) + .addLast("legacy_query", new LegacyPingHandler(MinecraftServer.getServer().getServerConnection())) + .addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder(EnumProtocolDirection.SERVERBOUND)) + .addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder(EnumProtocolDirection.CLIENTBOUND)); + + NetworkManager manager = new NetworkManager(EnumProtocolDirection.SERVERBOUND); + managers.add(manager); + + channel.pipeline().addLast("packet_handler", manager); + manager.setPacketListener(new HandshakeListener(MinecraftServer.getServer(), manager)); + } +} diff --git a/sources/src/main/java/io/akarin/server/mixin/core/MixinNetworkManager.java b/sources/src/main/java/io/akarin/server/mixin/core/MixinNetworkManager.java new file mode 100644 index 000000000..0c28daa73 --- /dev/null +++ b/sources/src/main/java/io/akarin/server/mixin/core/MixinNetworkManager.java @@ -0,0 +1,30 @@ +package io.akarin.server.mixin.core; + +import org.spongepowered.asm.mixin.Mixin; +import org.spongepowered.asm.mixin.Overwrite; +import org.spongepowered.asm.mixin.Shadow; +import io.netty.channel.Channel; +import net.minecraft.server.ITickable; +import net.minecraft.server.MCUtil; +import net.minecraft.server.NetworkManager; +import net.minecraft.server.PacketListener; + +@Mixin(value = NetworkManager.class, remap = false) +public class MixinNetworkManager { + @Shadow private boolean m() { return false; } + @Shadow private PacketListener m; + @Shadow public Channel channel; + @Shadow private static boolean enableExplicitFlush; + + @Overwrite + public void a() { + MCUtil.scheduleAsyncTask(() -> m()); + if (this.m instanceof ITickable) { + ((ITickable) this.m).e(); + } + + if (this.channel != null) { + if (enableExplicitFlush) this.channel.eventLoop().execute(() -> this.channel.flush()); // Paper - we don't need to explicit flush here, but allow opt in incase issues are found to a better version + } + } +} diff --git a/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java b/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java new file mode 100644 index 000000000..325b17a44 --- /dev/null +++ b/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java @@ -0,0 +1,162 @@ +package io.akarin.server.mixin.core; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.spigotmc.SpigotConfig; +import org.spongepowered.asm.mixin.Final; +import org.spongepowered.asm.mixin.Mixin; +import org.spongepowered.asm.mixin.Mutable; +import org.spongepowered.asm.mixin.Overwrite; +import org.spongepowered.asm.mixin.Shadow; +import org.spongepowered.asm.mixin.injection.At; +import org.spongepowered.asm.mixin.injection.Inject; +import org.spongepowered.asm.mixin.injection.callback.CallbackInfo; + +import com.google.common.collect.Lists; +import io.akarin.api.LocalAddress; +import io.akarin.api.WrappedCollections; +import io.akarin.server.core.ChannelAdapter; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import net.minecraft.server.ChatComponentText; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.NetworkManager; +import net.minecraft.server.PacketPlayOutKickDisconnect; +import net.minecraft.server.ServerConnection; + +@Mixin(value = ServerConnection.class, remap = false) +public class NonblockingServerConnection { + private final static Logger logger = LogManager.getLogger("NSC"); + + /** + * Contains all endpoints added to this NetworkSystem + */ + @Shadow @Mutable @Final private List g; + /** + * A list containing all NetworkManager instances of all endpoints + */ + @Shadow @Mutable @Final private List h; + + private final List pending = WrappedCollections.wrappedList(Collections.synchronizedList(Lists.newLinkedList())); + + @Overwrite + private void addPending() {} // just keep compatibility + + /** + * Removes all pending endpoints from global NetworkManager list + */ + private void removePending() { + synchronized (pending) { + h.removeAll(pending); + pending.clear(); + } + } + + @Inject(method = "", at = @At("RETURN")) + private void deployLists(CallbackInfo info) { + h = WrappedCollections.wrappedList(Lists.newCopyOnWriteArrayList()); + } + + @Shadow @Final private MinecraftServer f; + + /** + * Adds channels (endpoint) that listens on publicly accessible network ports + */ + @Overwrite + public void a(InetAddress address, int port) throws IOException { + registerChannels(Collections.singleton(LocalAddress.create(address, port))); + } + + public void registerChannels(Collection data) throws IOException { + Class channelClass; + EventLoopGroup loopGroup; + + if (Epoll.isAvailable() && this.f.af()) { // PAIL: MinecraftServer::useNativeTransport + channelClass = EpollServerSocketChannel.class; + loopGroup = ServerConnection.b.c(); + logger.info("Using epoll channel type"); + } else { + channelClass = NioServerSocketChannel.class; + loopGroup = ServerConnection.a.c(); + logger.info("Using nio channel type"); + } + + ServerBootstrap bootstrap = new ServerBootstrap().channel(channelClass).childHandler(ChannelAdapter.create(h)).group(loopGroup); + synchronized (g) { + data.forEach(address -> g.add(bootstrap.localAddress(address.host(), address.port()).bind().syncUninterruptibly())); // supports multi-port bind + } + } + + @Shadow public volatile boolean d; // PAIL: neverTerminate + /** + * Shuts down all open endpoints + */ + public void b() { + this.d = false; + try { + synchronized (g) { + for (ChannelFuture channel : g) channel.channel().close().sync(); + } + } catch (InterruptedException ex) { + logger.error("Interrupted whilst closing channel"); + } + } + + /** + * Will try to process the packets received by each NetworkManager, gracefully manage processing failures and cleans up dead connections (tick) + */ + @Overwrite + public void c() { + // Spigot - This prevents players from 'gaming' the server, and strategically relogging to increase their position in the tick order + if (SpigotConfig.playerShuffle > 0 && MinecraftServer.currentTick % SpigotConfig.playerShuffle == 0) { + Collections.shuffle(h); + } + boolean needRemoval = false; + + for (NetworkManager manager : h) { + if (manager.h()) continue; // PAIL: NetworkManager::hasNoChannel + + if (manager.isConnected()) { + try { + manager.a(); // PAIL: NetworkManager::processReceivedPackets + } catch (Exception ex) { + logger.warn("Failed to handle packet for {}", new Object[] { manager.getSocketAddress(), ex }); + final ChatComponentText kick = new ChatComponentText("Internal server error"); + + manager.sendPacket(new PacketPlayOutKickDisconnect(kick), new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + manager.close(kick); + } + }, new GenericFutureListener[0]); + manager.stopReading(); + } + } else { + // Spigot - Fix a race condition where a NetworkManager could be unregistered just before connection. + if (manager.preparing) continue; + + needRemoval = true; + synchronized (pending) { + pending.add(manager); + } + + manager.handleDisconnection(); + } + } + + if (needRemoval) removePending(); + } +} + diff --git a/sources/src/main/java/io/akarin/server/mixin/core/ParallelRegistry.java b/sources/src/main/java/io/akarin/server/mixin/core/ParallelRegistry.java index ef70a5cd7..47f878163 100644 --- a/sources/src/main/java/io/akarin/server/mixin/core/ParallelRegistry.java +++ b/sources/src/main/java/io/akarin/server/mixin/core/ParallelRegistry.java @@ -27,7 +27,7 @@ import net.minecraft.server.SoundEffect; @Mixin(value = DispenserRegistry.class, remap = false) public class ParallelRegistry { private static final ThreadFactory STAGE_FACTORY = new ThreadFactoryBuilder().setNameFormat("Parallel Registry Thread - %1$d").build(); - + /** * Registry order: SoundEffect -> Block */ diff --git a/sources/src/main/java/io/akarin/server/mixin/core/network/NonblockingServerConnection.java b/sources/src/main/java/io/akarin/server/mixin/core/network/NonblockingServerConnection.java deleted file mode 100644 index a083b2030..000000000 --- a/sources/src/main/java/io/akarin/server/mixin/core/network/NonblockingServerConnection.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.akarin.server.mixin.core.network; - -import org.spongepowered.asm.mixin.Mixin; -import net.minecraft.server.ServerConnection; - -@Mixin(value = ServerConnection.class, remap = false) -public class NonblockingServerConnection { - -} diff --git a/sources/src/main/resources/mixins.akarin.core.json b/sources/src/main/resources/mixins.akarin.core.json index d4b94d28c..4cabbc96a 100644 --- a/sources/src/main/resources/mixins.akarin.core.json +++ b/sources/src/main/resources/mixins.akarin.core.json @@ -9,10 +9,12 @@ "Bootstrap", "DummyEula", "ParallelRegistry", + "NonblockingServerConnection", "MixinMetrics", "MixinPaperConfig", "MixinCraftServer", + "MixinNetworkManager", "MixinVersionCommand", "MixinMinecraftServer" ]