Ensures packet safety w/ Improve async packet sending - Close #22

This commit is contained in:
Sotr
2018-07-13 20:14:25 +08:00
parent 68b4db992f
commit 5113e99853
7 changed files with 73 additions and 9 deletions

View File

@@ -6,6 +6,9 @@ import java.util.Queue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.collect.Queues;
@@ -15,6 +18,8 @@ import co.aikar.timings.Timing;
import co.aikar.timings.Timings;
import io.akarin.server.core.AkarinGlobalConfig;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.Packet;
import net.minecraft.server.PlayerConnection;
@SuppressWarnings("restriction")
public abstract class Akari {
@@ -33,6 +38,15 @@ public abstract class Akari {
*/
public static final Queue<Runnable> callbackQueue = Queues.newConcurrentLinkedQueue();
/**
* Lock-free packet queue for slack service
*/
public static final Queue<Packet<?>> slackPackets = Queues.newConcurrentLinkedQueue();
public static void sendPacket(PlayerConnection conn, @Nonnull Packet<?> packet) {
if (!conn.processedDisconnect) slackPackets.add(packet);
}
public static class AssignableThread extends Thread {
public AssignableThread(Runnable run) {
super(run);

View File

@@ -0,0 +1,7 @@
package io.akarin.api.internal.mixin;
import net.minecraft.server.EnumDifficulty;
public interface IMixinWorldData {
public void setDifficultyAsync(EnumDifficulty diff);
}

View File

@@ -4,6 +4,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.akarin.api.internal.Akari;
import io.akarin.api.internal.mixin.IMixinWorldData;
import net.minecraft.server.EntityPlayer;
import net.minecraft.server.EnumDifficulty;
import net.minecraft.server.MinecraftServer;
@@ -44,7 +45,8 @@ public class AkarinSlackScheduler extends Thread {
// Time update, from MinecraftServer#D
if (++updateTime >= AkarinGlobalConfig.timeUpdateInterval) {
for (EntityPlayer player : server.getPlayerList().players) {
player.playerConnection.sendPacket(new PacketPlayOutUpdateTime(player.world.getTime(), player.getPlayerTime(), player.world.getGameRules().getBoolean("doDaylightCycle"))); // Add support for per player time
// Add support for per player time
Akari.sendPacket(player.playerConnection, new PacketPlayOutUpdateTime(player.world.getTime(), player.getPlayerTime(), player.world.getGameRules().getBoolean("doDaylightCycle")));
}
updateTime = 0;
}
@@ -69,7 +71,7 @@ public class AkarinSlackScheduler extends Thread {
conn.setPendingPing(true);
conn.setLastPing(currentTime);
conn.setKeepAliveID(currentTime);
conn.sendPacket(new PacketPlayOutKeepAlive(conn.getKeepAliveID()));
Akari.sendPacket(conn, new PacketPlayOutKeepAlive(conn.getKeepAliveID()));
}
}
}
@@ -78,17 +80,17 @@ public class AkarinSlackScheduler extends Thread {
if (AkarinGlobalConfig.forceHardcoreDifficulty)
for (WorldServer world : server.worlds) {
if (world.getWorldData().isHardcore() && world.getDifficulty() != EnumDifficulty.HARD) {
world.getWorldData().setDifficulty(EnumDifficulty.HARD);
((IMixinWorldData) world.getWorldData()).setDifficultyAsync(EnumDifficulty.HARD);
}
}
// Update player info, from PlayerList#tick
if (++resendPlayersInfo > AkarinGlobalConfig.playersInfoUpdateInterval) {
for (EntityPlayer target : server.getPlayerList().players) {
target.playerConnection.sendPacket(new PacketPlayOutPlayerInfo(PacketPlayOutPlayerInfo.EnumPlayerInfoAction.UPDATE_LATENCY, Iterables.filter(server.getPlayerList().players, new Predicate<EntityPlayer>() {
for (EntityPlayer player : server.getPlayerList().players) {
Akari.sendPacket(player.playerConnection, new PacketPlayOutPlayerInfo(PacketPlayOutPlayerInfo.EnumPlayerInfoAction.UPDATE_LATENCY, Iterables.filter(server.getPlayerList().players, new Predicate<EntityPlayer>() {
@Override
public boolean apply(EntityPlayer input) {
return target.getBukkitEntity().canSee(input.getBukkitEntity());
public boolean apply(EntityPlayer each) {
return player.getBukkitEntity().canSee(each.getBukkitEntity());
}
})));
}

View File

@@ -0,0 +1,32 @@
package io.akarin.server.mixin.core;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Shadow;
import io.akarin.api.internal.Akari;
import io.akarin.api.internal.mixin.IMixinWorldData;
import net.minecraft.server.EntityHuman;
import net.minecraft.server.EntityPlayer;
import net.minecraft.server.EnumDifficulty;
import net.minecraft.server.PacketPlayOutServerDifficulty;
import net.minecraft.server.WorldData;
import net.minecraft.server.WorldServer;
@Mixin(value = WorldData.class, remap = false)
public abstract class MixinWorldData implements IMixinWorldData {
@Shadow(aliases = "C") private volatile EnumDifficulty difficulty;
@Shadow public WorldServer world;
@Shadow abstract public EnumDifficulty getDifficulty();
@Shadow abstract public boolean isDifficultyLocked();
@Override
public void setDifficultyAsync(EnumDifficulty diff) {
difficulty = diff;
PacketPlayOutServerDifficulty packet = new PacketPlayOutServerDifficulty(this.getDifficulty(), this.isDifficultyLocked());
for (EntityHuman player : world.players) {
Akari.sendPacket(((EntityPlayer) player).playerConnection, packet);
}
}
}

View File

@@ -8,6 +8,7 @@ import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Overwrite;
import org.spongepowered.asm.mixin.Shadow;
import io.akarin.api.internal.Akari;
import io.akarin.api.internal.collections.CheckedConcurrentLinkedQueue;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
@@ -26,7 +27,8 @@ public abstract class OptimisticNetworkManager {
@Shadow public abstract Queue<NetworkManager.QueuedPacket> getPacketQueue();
@Shadow public abstract void dispatchPacket(Packet<?> packet, GenericFutureListener<? extends Future<? super Void>>[] genericFutureListeners);
private static final QueuedPacket SIGNAL_PACKET = new QueuedPacket(null, null);
@SuppressWarnings("unchecked")
private static final QueuedPacket SIGNAL_PACKET = new QueuedPacket(null);
@Overwrite // PAIL: trySendQueue
private boolean m() {
@@ -53,6 +55,12 @@ public abstract class OptimisticNetworkManager {
} finally {
this.queueLock.readLock().unlock();
}
// Akarin start - process slack packets
while (!Akari.slackPackets.isEmpty()) {
// Plugins that hook into those packets will notify their listeners later, so keep sync
dispatchPacket(Akari.slackPackets.poll(), null);
}
// Akarin end
}
return true; // Return true if all packets were dispatched

View File

@@ -165,7 +165,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
this.j.writeLock().lock();
try {
this.i.add(new NetworkManager.QueuedPacket(packet, new GenericFutureListener[0]));
this.i.add(new NetworkManager.QueuedPacket(packet)); // Akarin - remove fake listener creation
} finally {
this.j.writeLock().unlock();
}