Refactor main-thread checks w/ Update Mixin and requirement to 1.7.10

This commit is contained in:
Sotr
2018-06-18 15:53:33 +08:00
parent e4ed177d7b
commit 8a9ad35a57
15 changed files with 107 additions and 132 deletions

View File

@@ -131,12 +131,7 @@
<dependency>
<groupId>org.spongepowered</groupId>
<artifactId>mixin</artifactId>
<version>0.7.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>me.nallar.whocalled</groupId>
<artifactId>WhoCalled</artifactId>
<version>1.1</version>
<version>0.7.10-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -13,6 +13,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import co.aikar.timings.Timing;
import co.aikar.timings.Timings;
import io.akarin.server.core.AkarinGlobalConfig;
import net.minecraft.server.MinecraftServer;
@SuppressWarnings("restriction")
@@ -22,30 +23,40 @@ public abstract class Akari {
*/
public final static Logger logger = LogManager.getLogger("Akarin");
/**
* Temporarily disable desync timings error, moreover it's worthless to trace async operation
*/
public static volatile boolean silentTiming;
/**
* A common thread pool factory
*/
public static final ThreadFactory STAGE_FACTORY = new ThreadFactoryBuilder().setNameFormat("Akarin Schedule Thread").build();
public static final ThreadFactory STAGE_FACTORY = new ThreadFactoryBuilder().setNameFormat("Akarin Parallel Registry Thread - %1$d").build();
/**
* Main thread callback tasks
*/
public static final Queue<Runnable> callbackQueue = Queues.newConcurrentLinkedQueue();
public static class AssignableThread extends Thread {
public AssignableThread(Runnable run) {
super(run);
}
}
private static class AssignableFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable run) {
Thread thread = new AssignableThread(run);
thread.setName("Akarin Parallel Schedule Thread");
thread.setPriority(AkarinGlobalConfig.primaryThreadPriority); // Fair
return thread;
}
}
/**
* A common tick pool
*/
public static final ExecutorCompletionService<?> STAGE_TICK = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor(Akari.STAGE_FACTORY));
public static boolean mayEnableAsyncCathcer;
public static final ExecutorCompletionService<?> STAGE_TICK = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor(new AssignableFactory()));
public static boolean isPrimaryThread() {
return Thread.currentThread().equals(MinecraftServer.getServer().primaryThread);
Thread current = Thread.currentThread();
return current == MinecraftServer.getServer().primaryThread || current instanceof AssignableThread;
}
public static final String EMPTY_STRING = "";
@@ -69,7 +80,9 @@ public abstract class Akari {
/*
* Timings
*/
public final static Timing worldTiming = getTiming("Akarin - World");
public final static Timing worldTiming = getTiming("Akarin - Full World Tick");
public final static Timing entityCallbackTiming = getTiming("Akarin - Entity Callback");
public final static Timing callbackTiming = getTiming("Akarin - Callback");

View File

@@ -195,21 +195,6 @@ public class AkarinGlobalConfig {
asyncLightingWorkStealing = getBoolean("core.async-lighting.use-work-stealing", false);
}
public static boolean enableMockPlugin;
private static void enableMockPlugin() {
enableMockPlugin = getBoolean("core.thread-safe.enable-mock-plugins", false);
}
public static List<String> mockPackageList;
private static void mockPluginList() {
mockPackageList = getList("core.thread-safe.mock-package-name-contains", Lists.newArrayList("me.konsolas.aac"));
}
public static boolean enableAsyncCatcher;
private static void enableAsyncCatcher() {
enableAsyncCatcher = getBoolean("core.thread-safe.async-catcher.enable", false);
}
public static boolean throwOnAsyncCaught;
private static void throwOnAsyncCaught() {
throwOnAsyncCaught = getBoolean("core.thread-safe.async-catcher.throw-on-caught", true);

View File

@@ -1,49 +0,0 @@
package io.akarin.server.core;
import java.util.List;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import net.minecraft.server.EnumProtocolDirection;
import net.minecraft.server.HandshakeListener;
import net.minecraft.server.LegacyPingHandler;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.NetworkManager;
import net.minecraft.server.PacketDecoder;
import net.minecraft.server.PacketEncoder;
import net.minecraft.server.PacketPrepender;
import net.minecraft.server.PacketSplitter;
public class ChannelAdapter extends ChannelInitializer<Channel> {
private final List<NetworkManager> managers;
public ChannelAdapter(List<NetworkManager> list) {
managers = list;
}
public static ChannelAdapter create(List<NetworkManager> managers) {
return new ChannelAdapter(managers);
}
@Override
protected void initChannel(Channel channel) {
try {
channel.config().setOption(ChannelOption.TCP_NODELAY, true);
} catch (ChannelException ex) {
;
}
channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30))
.addLast("legacy_query", new LegacyPingHandler(MinecraftServer.getServer().getServerConnection()))
.addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder(EnumProtocolDirection.SERVERBOUND))
.addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder(EnumProtocolDirection.CLIENTBOUND));
NetworkManager manager = new NetworkManager(EnumProtocolDirection.SERVERBOUND);
managers.add(manager);
channel.pipeline().addLast("packet_handler", manager);
manager.setPacketListener(new HandshakeListener(MinecraftServer.getServer(), manager));
}
}

View File

@@ -1,21 +0,0 @@
package io.akarin.server.core;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import net.minecraft.server.ChatComponentText;
import net.minecraft.server.NetworkManager;
public class NetworkCloseHandler implements GenericFutureListener<Future<? super Void>> {
private final NetworkManager manager;
private final ChatComponentText message;
public NetworkCloseHandler(NetworkManager instance, ChatComponentText text) {
manager = instance;
message = text;
}
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
manager.close(message);
}
}

View File

@@ -7,7 +7,6 @@ import org.spongepowered.asm.mixin.Shadow;
import io.akarin.api.internal.Akari;
import io.akarin.server.core.AkarinGlobalConfig;
import net.minecraft.server.MinecraftServer;
@Mixin(value = AsyncCatcher.class, remap = false)
public abstract class MixinAsyncCatcher {
@@ -15,7 +14,9 @@ public abstract class MixinAsyncCatcher {
@Overwrite
public static void catchOp(String reason) {
if (AkarinGlobalConfig.enableAsyncCatcher && Akari.mayEnableAsyncCathcer && enabled && Thread.currentThread() != MinecraftServer.getServer().primaryThread) {
if (enabled) {
if (Akari.isPrimaryThread()) return;
if (AkarinGlobalConfig.throwOnAsyncCaught) {
throw new IllegalStateException("Asynchronous " + reason + "!");
} else {

View File

@@ -9,7 +9,6 @@ import org.spongepowered.asm.mixin.Shadow;
import io.akarin.api.internal.Akari;
import io.akarin.server.core.AkarinGlobalConfig;
import me.nallar.whocalled.WhoCalled;
import net.minecraft.server.MinecraftServer;
@Mixin(value = CraftServer.class, remap = false)
@@ -31,15 +30,6 @@ public abstract class MixinCraftServer {
@Overwrite
public boolean isPrimaryThread() {
if (AkarinGlobalConfig.enableMockPlugin && !AkarinGlobalConfig.mockPackageList.isEmpty()) {
// Mock forcely main thread plugins
String callerPackage = WhoCalled.$.getCallingClass().getPackage().getName();
if (callerPackage.startsWith("net.minecraft") || callerPackage.startsWith("org.bukkit") ||
callerPackage.startsWith("co.aikar") || callerPackage.startsWith("io.akarin")) return Thread.currentThread().equals(console.primaryThread);
for (String contains : AkarinGlobalConfig.mockPackageList) {
if (callerPackage.contains(contains)) return true;
}
}
return Thread.currentThread().equals(console.primaryThread);
return Akari.isPrimaryThread();
}
}

View File

@@ -1,16 +1,38 @@
package io.akarin.server.mixin.core;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.bukkit.craftbukkit.util.Waitable;
import org.spigotmc.AsyncCatcher;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Overwrite;
import io.akarin.api.internal.Akari;
import net.minecraft.server.MCUtil;
import net.minecraft.server.MinecraftServer;
@Mixin(value = MCUtil.class, remap = false)
public abstract class MixinMCUtil {
@Overwrite
public static <T> T ensureMain(String reason, Supplier<T> run) {
if (AsyncCatcher.enabled && !Akari.isPrimaryThread()) {
new IllegalStateException("Asynchronous " + reason + "! Blocking thread until it returns ").printStackTrace();
Waitable<T> wait = new Waitable<T>() {
@Override
protected T evaluate() {
return run.get();
}
};
MinecraftServer.getServer().processQueue.add(wait);
try {
return wait.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}
return run.get();
}
}

View File

@@ -58,6 +58,11 @@ public abstract class MixinMinecraftServer {
}
@Overwrite
public boolean isMainThread() {
return Akari.isPrimaryThread();
}
/*
* Forcely disable snooper
*/
@@ -147,8 +152,6 @@ public abstract class MixinMinecraftServer {
worlds.get(i).timings.doTick.startTiming();
}
}
Akari.silentTiming = true; // Disable timings
Akari.mayEnableAsyncCathcer = false;
Akari.STAGE_TICK.submit(() -> {
// Never tick one world concurrently!
for (int i = 1; i <= worlds.size(); ++i) {
@@ -166,9 +169,10 @@ public abstract class MixinMinecraftServer {
}
}
Akari.entityCallbackTiming.startTiming();
Akari.STAGE_TICK.take();
Akari.mayEnableAsyncCathcer = true;
Akari.silentTiming = false; // Enable timings
Akari.entityCallbackTiming.stopTiming();
Akari.worldTiming.stopTiming();
if (AkarinGlobalConfig.legacyWorldTimings) {
for (int i = 0; i < worlds.size(); ++i) {

View File

@@ -36,12 +36,12 @@ public abstract class MixinTimingHandler {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Inject(method = "startTiming", at = @At("HEAD"), cancellable = true)
public void onStartTiming(CallbackInfoReturnable ci) {
if (Akari.silentTiming || !Akari.isPrimaryThread()) ci.setReturnValue(this); // Avoid modify any field
if (!Akari.isPrimaryThread()) ci.setReturnValue(this); // Avoid modify any field
}
@Overwrite
public void stopTimingIfSync() {
if (Akari.isPrimaryThread()) { // Use non-mock method
if (Akari.isPrimaryThread()) {
stopTiming(true); // Avoid twice thread check
}
}
@@ -52,12 +52,12 @@ public abstract class MixinTimingHandler {
}
public void stopTiming(boolean alreadySync) {
if (!enabled || Akari.silentTiming) return;
if (!enabled) return;
if (!alreadySync && !Akari.isPrimaryThread()) {
if (AkarinGlobalConfig.silentAsyncTimings) return;
Bukkit.getLogger().log(Level.SEVERE, "stopTiming called async for " + name);
new Throwable().printStackTrace();
Thread.dumpStack();
}
// Main thread ensured

View File

@@ -20,20 +20,31 @@ import com.google.common.collect.Lists;
import io.akarin.api.internal.LocalAddress;
import io.akarin.server.core.AkarinGlobalConfig;
import io.akarin.server.core.ChannelAdapter;
import io.akarin.server.core.NetworkCloseHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import net.minecraft.server.ChatComponentText;
import net.minecraft.server.EnumProtocolDirection;
import net.minecraft.server.HandshakeListener;
import net.minecraft.server.LegacyPingHandler;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.NetworkManager;
import net.minecraft.server.PacketDecoder;
import net.minecraft.server.PacketEncoder;
import net.minecraft.server.PacketPlayOutKickDisconnect;
import net.minecraft.server.PacketPrepender;
import net.minecraft.server.PacketSplitter;
import net.minecraft.server.ServerConnection;
@Mixin(value = ServerConnection.class, remap = false)
@@ -76,7 +87,26 @@ public abstract class NonblockingServerConnection {
logger.info("Using nio channel type");
}
ServerBootstrap bootstrap = new ServerBootstrap().channel(channelClass).childHandler(ChannelAdapter.create(networkManagers)).group(loopGroup);
ServerBootstrap bootstrap = new ServerBootstrap().channel(channelClass).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
try {
channel.config().setOption(ChannelOption.TCP_NODELAY, true);
} catch (ChannelException ex) {
;
}
channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30))
.addLast("legacy_query", new LegacyPingHandler(MinecraftServer.getServer().getServerConnection()))
.addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder(EnumProtocolDirection.SERVERBOUND))
.addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder(EnumProtocolDirection.CLIENTBOUND));
NetworkManager manager = new NetworkManager(EnumProtocolDirection.SERVERBOUND);
networkManagers.add(manager);
channel.pipeline().addLast("packet_handler", manager);
manager.setPacketListener(new HandshakeListener(MinecraftServer.getServer(), manager));
}
}).group(loopGroup);
synchronized (endPoints) {
data.addAll(Lists.transform(AkarinGlobalConfig.extraAddress, s -> {
String[] info = s.split(":");
@@ -112,9 +142,14 @@ public abstract class NonblockingServerConnection {
manager.a(); // PAIL: NetworkManager::processReceivedPackets
} catch (Exception ex) {
logger.warn("Failed to handle packet for {}", manager.getSocketAddress(), ex);
final ChatComponentText kick = new ChatComponentText("Internal server error");
final ChatComponentText message = new ChatComponentText("Internal server error");
manager.sendPacket(new PacketPlayOutKickDisconnect(kick), new NetworkCloseHandler(manager, kick), new GenericFutureListener[0]);
manager.sendPacket(new PacketPlayOutKickDisconnect(message), new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
manager.close(message);
}
}, new GenericFutureListener[0]);
manager.stopReading();
}
}

View File

@@ -1,6 +1,6 @@
{
"required": true,
"minVersion": "0.7.8",
"minVersion": "0.7.10",
"package": "io.akarin.server.mixin",
"target": "@env(DEFAULT)",
"compatibilityLevel": "JAVA_8",

View File

@@ -1,6 +1,6 @@
{
"required": true,
"minVersion": "0.7.8",
"minVersion": "0.7.10",
"package": "io.akarin.server.mixin",
"target": "@env(DEFAULT)",
"compatibilityLevel": "JAVA_8",

View File

@@ -1,6 +1,6 @@
{
"required": true,
"minVersion": "0.7.8",
"minVersion": "0.7.10",
"package": "io.akarin.server.mixin",
"target": "@env(DEFAULT)",
"compatibilityLevel": "JAVA_8",

View File

@@ -1,6 +1,6 @@
{
"required": true,
"minVersion": "0.7.8",
"minVersion": "0.7.10",
"package": "io.akarin.server.mixin",
"target": "@env(DEFAULT)",
"compatibilityLevel": "JAVA_8",