Non-blocking ServerConnection (not really) w/ Multi-port support (future)

This commit is contained in:
Sotr
2018-05-29 02:54:40 +08:00
parent a6d796b88d
commit ab6f21addf
8 changed files with 481 additions and 10 deletions

View File

@@ -0,0 +1,28 @@
package io.akarin.api;
import java.net.InetAddress;
import javax.annotation.concurrent.Immutable;
@Immutable
public class LocalAddress {
private final InetAddress host;
private final int port;
public static LocalAddress create(InetAddress localHost, int localPort) {
return new LocalAddress(localHost, localPort);
}
public LocalAddress(InetAddress localHost, int localPort) {
host = localHost;
port = localPort;
}
public InetAddress host() {
return host;
}
public int port() {
return port;
}
}

View File

@@ -0,0 +1,209 @@
package io.akarin.api;
import java.io.Serializable;
import java.io.ObjectOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
public class WrappedCollections {
// Wrappers
public static <T> Collection<T> wrappedCollection(Collection<T> c) {
return new WrappedCollection<>(c);
}
/**
* @serial include
*/
static class WrappedCollection<E> implements Collection<E>, Serializable {
private static final long serialVersionUID = 3053995032091335093L;
final Collection<E> list; // Backing Collection
final Object mutex; // Object on which to synchronize
WrappedCollection(Collection<E> c) {
this.list = Objects.requireNonNull(c);
mutex = this;
}
@Override
public int size() {
return list.size();
}
@Override
public boolean isEmpty() {
return list.isEmpty();
}
@Override
public boolean contains(Object o) {
return list.contains(o);
}
@Override
public Object[] toArray() {
return list.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return list.toArray(a);
}
@Override
public Iterator<E> iterator() {
return list.iterator();
}
@Override
public boolean add(E e) {
return list.add(e);
}
@Override
public boolean remove(Object o) {
return list.remove(o);
}
@Override
public boolean containsAll(Collection<?> coll) {
return list.containsAll(coll);
}
@Override
public boolean addAll(Collection<? extends E> coll) {
return list.addAll(coll);
}
@Override
public boolean removeAll(Collection<?> coll) {
return list.removeAll(coll);
}
@Override
public boolean retainAll(Collection<?> coll) {
return list.retainAll(coll);
}
@Override
public void clear() {
list.clear();
}
@Override
public String toString() {
return list.toString();
}
// Override default methods in Collection
@Override
public void forEach(Consumer<? super E> consumer) {
list.forEach(consumer);
}
@Override
public boolean removeIf(Predicate<? super E> filter) {
return list.removeIf(filter);
}
@Override
public Spliterator<E> spliterator() {
return list.spliterator();
}
@Override
public Stream<E> stream() {
return list.stream();
}
@Override
public Stream<E> parallelStream() {
return list.parallelStream();
}
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
}
}
public static <T> List<T> wrappedList(List<T> list) {
return new WrappedList<>(list);
}
/**
* @serial include
*/
static class WrappedList<E> extends WrappedCollection<E> implements List<E> {
private static final long serialVersionUID = -7754090372962971524L;
final List<E> list;
WrappedList(List<E> list) {
super(list);
this.list = list;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
return list.equals(o);
}
@Override
public int hashCode() {
return list.hashCode();
}
@Override
public E get(int index) {
return list.get(index);
}
@Override
public E set(int index, E element) {
return list.set(index, element);
}
@Override
public void add(int index, E element) {
list.add(index, element);
}
@Override
public E remove(int index) {
return list.remove(index);
}
@Override
public int indexOf(Object o) {
return list.indexOf(o);
}
@Override
public int lastIndexOf(Object o) {
return list.lastIndexOf(o);
}
@Override
public boolean addAll(int index, Collection<? extends E> c) {
return list.addAll(index, c);
}
@Override
public ListIterator<E> listIterator() {
return list.listIterator();
}
@Override
public ListIterator<E> listIterator(int index) {
return list.listIterator(index);
}
@Override
public List<E> subList(int fromIndex, int toIndex) {
return new WrappedList<>(list.subList(fromIndex, toIndex));
}
@Override
public void replaceAll(UnaryOperator<E> operator) {
list.replaceAll(operator);
}
@Override
public void sort(Comparator<? super E> c) {
list.sort(c);
}
private Object readResolve() {
return this;
}
}
}

View File

@@ -0,0 +1,49 @@
package io.akarin.server.core;
import java.util.List;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import net.minecraft.server.EnumProtocolDirection;
import net.minecraft.server.HandshakeListener;
import net.minecraft.server.LegacyPingHandler;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.NetworkManager;
import net.minecraft.server.PacketDecoder;
import net.minecraft.server.PacketEncoder;
import net.minecraft.server.PacketPrepender;
import net.minecraft.server.PacketSplitter;
public class ChannelAdapter extends ChannelInitializer<Channel> {
private final List<NetworkManager> managers;
public ChannelAdapter(List<NetworkManager> list) {
managers = list;
}
public static ChannelAdapter create(List<NetworkManager> managers) {
return new ChannelAdapter(managers);
}
@Override
protected void initChannel(Channel channel) {
try {
channel.config().setOption(ChannelOption.TCP_NODELAY, true);
} catch (ChannelException ex) {
;
}
channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30))
.addLast("legacy_query", new LegacyPingHandler(MinecraftServer.getServer().getServerConnection()))
.addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder(EnumProtocolDirection.SERVERBOUND))
.addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder(EnumProtocolDirection.CLIENTBOUND));
NetworkManager manager = new NetworkManager(EnumProtocolDirection.SERVERBOUND);
managers.add(manager);
channel.pipeline().addLast("packet_handler", manager);
manager.setPacketListener(new HandshakeListener(MinecraftServer.getServer(), manager));
}
}

View File

@@ -0,0 +1,30 @@
package io.akarin.server.mixin.core;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Overwrite;
import org.spongepowered.asm.mixin.Shadow;
import io.netty.channel.Channel;
import net.minecraft.server.ITickable;
import net.minecraft.server.MCUtil;
import net.minecraft.server.NetworkManager;
import net.minecraft.server.PacketListener;
@Mixin(value = NetworkManager.class, remap = false)
public class MixinNetworkManager {
@Shadow private boolean m() { return false; }
@Shadow private PacketListener m;
@Shadow public Channel channel;
@Shadow private static boolean enableExplicitFlush;
@Overwrite
public void a() {
MCUtil.scheduleAsyncTask(() -> m());
if (this.m instanceof ITickable) {
((ITickable) this.m).e();
}
if (this.channel != null) {
if (enableExplicitFlush) this.channel.eventLoop().execute(() -> this.channel.flush()); // Paper - we don't need to explicit flush here, but allow opt in incase issues are found to a better version
}
}
}

View File

@@ -0,0 +1,162 @@
package io.akarin.server.mixin.core;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.spigotmc.SpigotConfig;
import org.spongepowered.asm.mixin.Final;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Mutable;
import org.spongepowered.asm.mixin.Overwrite;
import org.spongepowered.asm.mixin.Shadow;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
import com.google.common.collect.Lists;
import io.akarin.api.LocalAddress;
import io.akarin.api.WrappedCollections;
import io.akarin.server.core.ChannelAdapter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import net.minecraft.server.ChatComponentText;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.NetworkManager;
import net.minecraft.server.PacketPlayOutKickDisconnect;
import net.minecraft.server.ServerConnection;
@Mixin(value = ServerConnection.class, remap = false)
public class NonblockingServerConnection {
private final static Logger logger = LogManager.getLogger("NSC");
/**
* Contains all endpoints added to this NetworkSystem
*/
@Shadow @Mutable @Final private List<ChannelFuture> g;
/**
* A list containing all NetworkManager instances of all endpoints
*/
@Shadow @Mutable @Final private List<NetworkManager> h;
private final List<NetworkManager> pending = WrappedCollections.wrappedList(Collections.synchronizedList(Lists.newLinkedList()));
@Overwrite
private void addPending() {} // just keep compatibility
/**
* Removes all pending endpoints from global NetworkManager list
*/
private void removePending() {
synchronized (pending) {
h.removeAll(pending);
pending.clear();
}
}
@Inject(method = "<init>", at = @At("RETURN"))
private void deployLists(CallbackInfo info) {
h = WrappedCollections.wrappedList(Lists.newCopyOnWriteArrayList());
}
@Shadow @Final private MinecraftServer f;
/**
* Adds channels (endpoint) that listens on publicly accessible network ports
*/
@Overwrite
public void a(InetAddress address, int port) throws IOException {
registerChannels(Collections.singleton(LocalAddress.create(address, port)));
}
public void registerChannels(Collection<LocalAddress> data) throws IOException {
Class<? extends ServerChannel> channelClass;
EventLoopGroup loopGroup;
if (Epoll.isAvailable() && this.f.af()) { // PAIL: MinecraftServer::useNativeTransport
channelClass = EpollServerSocketChannel.class;
loopGroup = ServerConnection.b.c();
logger.info("Using epoll channel type");
} else {
channelClass = NioServerSocketChannel.class;
loopGroup = ServerConnection.a.c();
logger.info("Using nio channel type");
}
ServerBootstrap bootstrap = new ServerBootstrap().channel(channelClass).childHandler(ChannelAdapter.create(h)).group(loopGroup);
synchronized (g) {
data.forEach(address -> g.add(bootstrap.localAddress(address.host(), address.port()).bind().syncUninterruptibly())); // supports multi-port bind
}
}
@Shadow public volatile boolean d; // PAIL: neverTerminate
/**
* Shuts down all open endpoints
*/
public void b() {
this.d = false;
try {
synchronized (g) {
for (ChannelFuture channel : g) channel.channel().close().sync();
}
} catch (InterruptedException ex) {
logger.error("Interrupted whilst closing channel");
}
}
/**
* Will try to process the packets received by each NetworkManager, gracefully manage processing failures and cleans up dead connections (tick)
*/
@Overwrite
public void c() {
// Spigot - This prevents players from 'gaming' the server, and strategically relogging to increase their position in the tick order
if (SpigotConfig.playerShuffle > 0 && MinecraftServer.currentTick % SpigotConfig.playerShuffle == 0) {
Collections.shuffle(h);
}
boolean needRemoval = false;
for (NetworkManager manager : h) {
if (manager.h()) continue; // PAIL: NetworkManager::hasNoChannel
if (manager.isConnected()) {
try {
manager.a(); // PAIL: NetworkManager::processReceivedPackets
} catch (Exception ex) {
logger.warn("Failed to handle packet for {}", new Object[] { manager.getSocketAddress(), ex });
final ChatComponentText kick = new ChatComponentText("Internal server error");
manager.sendPacket(new PacketPlayOutKickDisconnect(kick), new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
manager.close(kick);
}
}, new GenericFutureListener[0]);
manager.stopReading();
}
} else {
// Spigot - Fix a race condition where a NetworkManager could be unregistered just before connection.
if (manager.preparing) continue;
needRemoval = true;
synchronized (pending) {
pending.add(manager);
}
manager.handleDisconnection();
}
}
if (needRemoval) removePending();
}
}

View File

@@ -27,7 +27,7 @@ import net.minecraft.server.SoundEffect;
@Mixin(value = DispenserRegistry.class, remap = false)
public class ParallelRegistry {
private static final ThreadFactory STAGE_FACTORY = new ThreadFactoryBuilder().setNameFormat("Parallel Registry Thread - %1$d").build();
/**
* Registry order: SoundEffect -> Block
*/

View File

@@ -1,9 +0,0 @@
package io.akarin.server.mixin.core.network;
import org.spongepowered.asm.mixin.Mixin;
import net.minecraft.server.ServerConnection;
@Mixin(value = ServerConnection.class, remap = false)
public class NonblockingServerConnection {
}

View File

@@ -9,10 +9,12 @@
"Bootstrap",
"DummyEula",
"ParallelRegistry",
"NonblockingServerConnection",
"MixinMetrics",
"MixinPaperConfig",
"MixinCraftServer",
"MixinNetworkManager",
"MixinVersionCommand",
"MixinMinecraftServer"
]