Optimizes packet sending

This commit is contained in:
Sotr
2019-03-11 16:01:48 +08:00
parent 6f1c0a65ce
commit 734ece203c
3 changed files with 1036 additions and 6 deletions

View File

@@ -18,6 +18,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -45,7 +46,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build());
});
private final EnumProtocolDirection h;
private final Queue<NetworkManager.QueuedPacket> packetQueue = Queues.newConcurrentLinkedQueue(); private final Queue<NetworkManager.QueuedPacket> getPacketQueue() { return this.packetQueue; } // Paper - OBFHELPER
private final io.akarin.server.misc.CheckedConcurrentLinkedQueue<NetworkManager.QueuedPacket> packetQueue = new io.akarin.server.misc.CheckedConcurrentLinkedQueue<NetworkManager.QueuedPacket>(); private final Queue<NetworkManager.QueuedPacket> getPacketQueue() { return this.packetQueue; } // Paper - OBFHELPER // Akarin
private final ReentrantReadWriteLock j = new ReentrantReadWriteLock();
public Channel channel;
public SocketAddress socketAddress; public void setSpoofedRemoteAddress(SocketAddress address) { this.socketAddress = address; } // Paper - OBFHELPER
@@ -167,11 +168,61 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
this.sendPacket(packet, (GenericFutureListener) null);
}
// Akarin start
public void sendPacket(Packet<?>... packets) {
if (this.isConnected()) { // why send packet to whom not connected?
this.j.readLock().lock();
try {
// Send queued packets
this.sendPacketQueueUnsafe();
// Dispatch or queue new packets
for (Packet<?> packet : packets) {
boolean dispatch = packet instanceof PacketStatusOutPong || packet instanceof PacketStatusOutServerInfo || (packet instanceof PacketPlayOutMapChunk && ((PacketPlayOutMapChunk) packet).isReady());
if (dispatch)
this.dispatchPacket(packet, null);
else {
this.packetQueue.add(new QueuedPacket(packet, null));
}
}
} finally {
this.j.readLock().unlock();
}
}
}
private void sendPacketQueueUnsafe() {
while (!this.packetQueue.isEmpty()) {
QueuedPacket queuedPacket = this.packetQueue.poll(packet ->
packet != null &&
(!(packet.getPacket() instanceof PacketPlayOutMapChunk) ||
(((PacketPlayOutMapChunk) packet.getPacket()).isReady())), null);
if (queuedPacket != null)
this.dispatchPacket(queuedPacket.getPacket(), queuedPacket.getGenericFutureListener());
}
}
// Akarin end
public void sendPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericfuturelistener) {
if (this.isConnected() && this.sendPacketQueue() && !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady())) { // Paper - Async-Anti-Xray - Add chunk packets which are not ready or all packets if the packet queue contains chunk packets which are not ready to the packet queue and send the packets later in the right order
if (this.isConnected() /*&& this.sendPacketQueue() && !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady())*/) { // Paper - Async-Anti-Xray - Add chunk packets which are not ready or all packets if the packet queue contains chunk packets which are not ready to the packet queue and send the packets later in the right order // Akarin
//this.o(); // Paper - Async-Anti-Xray - Move to if statement (this.sendPacketQueue())
this.b(packet, genericfuturelistener);
} else {
// Akarin start
this.j.readLock().lock();
try {
// Send queued packets
this.sendPacketQueueUnsafe();
// Dispatch or queue new packets
boolean dispatch = packet instanceof PacketStatusOutPong || packet instanceof PacketStatusOutServerInfo || (packet instanceof PacketPlayOutMapChunk && ((PacketPlayOutMapChunk) packet).isReady());
if (dispatch)
this.dispatchPacket(packet, genericfuturelistener);
else {
this.packetQueue.add(new QueuedPacket(packet, genericfuturelistener));
}
} finally {
this.j.readLock().unlock();
}
//this.b(packet, genericfuturelistener);
} else if (false) {
// Akarin end
this.j.writeLock().lock();
try {
@@ -188,7 +239,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
EnumProtocol enumprotocol = EnumProtocol.a(packet);
EnumProtocol enumprotocol1 = (EnumProtocol) this.channel.attr(NetworkManager.c).get();
++this.r;
//++this.r; // Akarin - meaningless
if (enumprotocol1 != enumprotocol) {
NetworkManager.g.debug("Disabled auto read");
this.channel.config().setAutoRead(false);