diff --git a/sources/src/main/java/io/akarin/api/Akari.java b/sources/src/main/java/io/akarin/api/Akari.java index ea886603b..bf87cd6ce 100644 --- a/sources/src/main/java/io/akarin/api/Akari.java +++ b/sources/src/main/java/io/akarin/api/Akari.java @@ -44,7 +44,7 @@ public abstract class Akari { /* * The unsafe */ - public static sun.misc.Unsafe UNSAFE = getUnsafe(); + public final static sun.misc.Unsafe UNSAFE = getUnsafe(); private static sun.misc.Unsafe getUnsafe() { try { @@ -60,15 +60,15 @@ public abstract class Akari { /* * Timings */ - public static Timing worldTiming = getTiming("Akarin - World"); + public final static Timing worldTiming = getTiming("Akarin - World"); - public static Timing callbackTiming = getTiming("Akarin - Callback"); + public final static Timing callbackTiming = getTiming("Akarin - Callback"); private static Timing getTiming(String name) { try { Method ofSafe = Timings.class.getDeclaredMethod("ofSafe", String.class); ofSafe.setAccessible(true); - return worldTiming = (Timing) ofSafe.invoke(null, name); + return (Timing) ofSafe.invoke(null, name); } catch (Throwable t) { t.printStackTrace(); return null; diff --git a/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java b/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java index 22be3cc93..528a18076 100644 --- a/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java +++ b/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java @@ -35,8 +35,6 @@ package io.akarin.api; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.util.AbstractQueue; import java.util.ArrayList; import java.util.Collection; @@ -47,10 +45,6 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.function.Consumer; -import io.akarin.server.core.AkarinGlobalConfig; -import net.minecraft.server.PacketPlayOutMapChunk; -import net.minecraft.server.NetworkManager.QueuedPacket; - /** * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. * This queue orders elements FIFO (first-in-first-out). @@ -106,11 +100,38 @@ import net.minecraft.server.NetworkManager.QueuedPacket; * * @since 1.5 * @author Doug Lea - * @param the type of elements held in this collection + * @param the type of elements held in this collection */ -public class CheckedConcurrentLinkedQueue extends AbstractQueue - implements Queue, java.io.Serializable { +public class CheckedConcurrentLinkedQueue extends AbstractQueue + implements Queue, java.io.Serializable { private static final long serialVersionUID = 196745693267521676L; + + public E poll(com.google.common.base.Predicate predicate, E singalInstance) { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + + if (predicate.apply(item)) return singalInstance; // Test predicate + + if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } /* * This is a modification of the Michael & Scott algorithm, @@ -215,7 +236,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue static { try { - UNSAFE = Akari.UNSAFE; // Akarin + UNSAFE = Akari.UNSAFE; Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); @@ -239,7 +260,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ - private transient volatile Node head; + private transient volatile Node head; /** * A node from which the last node on list (that is, the unique @@ -253,13 +274,13 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ - private transient volatile Node tail; + private transient volatile Node tail; /** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public CheckedConcurrentLinkedQueue() { - head = tail = new Node(null); + head = tail = new Node(null); } /** @@ -271,11 +292,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * @throws NullPointerException if the specified collection or any * of its elements are null */ - public CheckedConcurrentLinkedQueue(Collection c) { - Node h = null, t = null; - for (QueuedPacket e : c) { + public CheckedConcurrentLinkedQueue(Collection c) { + Node h = null, t = null; + for (E e : c) { checkNotNull(e); - Node newNode = new Node(e); + Node newNode = new Node(e); if (h == null) h = t = newNode; else { @@ -284,7 +305,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } } if (h == null) - h = t = new Node(null); + h = t = new Node(null); head = h; tail = t; } @@ -300,7 +321,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ @Override - public boolean add(QueuedPacket e) { + public boolean add(E e) { return offer(e); } @@ -308,7 +329,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ - final void updateHead(Node h, Node p) { + final void updateHead(Node h, Node p) { if (h != p && casHead(h, p)) h.lazySetNext(h); } @@ -318,8 +339,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ - final Node succ(Node p) { - Node next = p.next; + final Node succ(Node p) { + Node next = p.next; return (p == next) ? head : next; } @@ -331,12 +352,12 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ @Override - public boolean offer(QueuedPacket e) { + public boolean offer(E e) { checkNotNull(e); - final Node newNode = new Node(e); + final Node newNode = new Node(e); - for (Node t = tail, p = t;;) { - Node q = p.next; + for (Node t = tail, p = t;;) { + Node q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { @@ -362,12 +383,12 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } @Override - public QueuedPacket poll() { + public E poll() { restartFromHead: for (;;) { - for (Node h = head, p = h, q;;) { - QueuedPacket item = p.item; - + for (Node h = head, p = h, q;;) { + E item = p.item; + if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. @@ -386,46 +407,13 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } } } - - // Akarin start - Add checked poll - public static QueuedPacket emptyPacket = new QueuedPacket(null, null); - - public QueuedPacket checkedPoll() { - restartFromHead: - for (;;) { - for (Node h = head, p = h, q;;) { - QueuedPacket item = p.item; - // - if (item.getPacket() instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) item.getPacket()).isReady()) { // Check if the peeked packet is a chunk packet which is not ready - return emptyPacket; // Return false if the peeked packet is a chunk packet which is not ready - } - // - if (item != null && p.casItem(item, null)) { - // Successful CAS is the linearization point - // for item to be removed from this queue. - if (p != h) // hop two nodes at a time - updateHead(h, ((q = p.next) != null) ? q : p); - return item; - } - else if ((q = p.next) == null) { - updateHead(h, p); - return null; - } - else if (p == q) - continue restartFromHead; - else - p = q; - } - } - } - // Akarin end @Override - public QueuedPacket peek() { + public E peek() { restartFromHead: for (;;) { - for (Node h = head, p = h, q;;) { - QueuedPacket item = p.item; + for (Node h = head, p = h, q;;) { + E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; @@ -446,10 +434,10 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * and the need to add a retry loop to deal with the possibility * of losing a race to a concurrent poll(). */ - Node first() { + Node first() { restartFromHead: for (;;) { - for (Node h = head, p = h, q;;) { + for (Node h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); @@ -492,7 +480,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue @Override public int size() { int count = 0; - for (Node p = first(); p != null; p = succ(p)) + for (Node p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) @@ -511,8 +499,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue @Override public boolean contains(Object o) { if (o == null) return false; - for (Node p = first(); p != null; p = succ(p)) { - QueuedPacket item = p.item; + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; if (item != null && o.equals(item)) return true; } @@ -533,10 +521,10 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue @Override public boolean remove(Object o) { if (o != null) { - Node next, pred = null; - for (Node p = first(); p != null; pred = p, p = next) { + Node next, pred = null; + for (Node p = first(); p != null; pred = p, p = next) { boolean removed = false; - QueuedPacket item = p.item; + E item = p.item; if (item != null) { if (!o.equals(item)) { next = succ(p); @@ -568,16 +556,16 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * @throws IllegalArgumentException if the collection is this queue */ @Override - public boolean addAll(Collection c) { + public boolean addAll(Collection c) { if (c == this) // As historically specified in AbstractQueue#addAll throw new IllegalArgumentException(); // Copy c into a private chain of Nodes - Node beginningOfTheEnd = null, last = null; - for (QueuedPacket e : c) { + Node beginningOfTheEnd = null, last = null; + for (E e : c) { checkNotNull(e); - Node newNode = new Node(e); + Node newNode = new Node(e); if (beginningOfTheEnd == null) beginningOfTheEnd = last = newNode; else { @@ -589,8 +577,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue return false; // Atomically append the chain at the tail of this collection - for (Node t = tail, p = t;;) { - Node q = p.next; + for (Node t = tail, p = t;;) { + Node q = p.next; if (q == null) { // p is last node if (p.casNext(null, beginningOfTheEnd)) { @@ -635,9 +623,9 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue @Override public Object[] toArray() { // Use ArrayList to deal with resizing. - ArrayList al = new ArrayList(); - for (Node p = first(); p != null; p = succ(p)) { - QueuedPacket item = p.item; + ArrayList al = new ArrayList(); + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; if (item != null) al.add(item); } @@ -684,9 +672,9 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue public T[] toArray(T[] a) { // try to use sent-in array int k = 0; - Node p; + Node p; for (p = first(); p != null && k < a.length; p = succ(p)) { - QueuedPacket item = p.item; + E item = p.item; if (item != null) a[k++] = (T)item; } @@ -697,9 +685,9 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } // If won't fit, use ArrayList version - ArrayList al = new ArrayList(); - for (Node q = first(); q != null; q = succ(q)) { - QueuedPacket item = q.item; + ArrayList al = new ArrayList(); + for (Node q = first(); q != null; q = succ(q)) { + E item = q.item; if (item != null) al.add(item); } @@ -716,15 +704,15 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * @return an iterator over the elements in this queue in proper sequence */ @Override - public Iterator iterator() { + public Iterator iterator() { return new Itr(); } - private class Itr implements Iterator { + private class Itr implements Iterator { /** * Next node to return item for. */ - private Node nextNode; + private Node nextNode; /** * nextItem holds on to item fields because once we claim @@ -732,12 +720,12 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * the following next() call even if it was in the process of * being removed when hasNext() was called. */ - private QueuedPacket nextItem; + private E nextItem; /** * Node of the last returned item, to support remove. */ - private Node lastRet; + private Node lastRet; Itr() { advance(); @@ -747,11 +735,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * Moves to next valid node and returns item to return for * next(), or null if no such. */ - private QueuedPacket advance() { + private E advance() { lastRet = nextNode; - QueuedPacket x = nextItem; + E x = nextItem; - Node pred, p; + Node pred, p; if (nextNode == null) { p = first(); pred = null; @@ -766,14 +754,14 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue nextItem = null; return x; } - QueuedPacket item = p.item; + E item = p.item; if (item != null) { nextNode = p; nextItem = item; return x; } else { // skip over nulls - Node next = succ(p); + Node next = succ(p); if (pred != null && next != null) pred.casNext(p, next); p = next; @@ -787,14 +775,14 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } @Override - public QueuedPacket next() { + public E next() { if (nextNode == null) throw new NoSuchElementException(); return advance(); } @Override public void remove() { - Node l = lastRet; + Node l = lastRet; if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. l.item = null; @@ -817,7 +805,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue s.defaultWriteObject(); // Write out all elements in the proper order. - for (Node p = first(); p != null; p = succ(p)) { + for (Node p = first(); p != null; p = succ(p)) { Object item = p.item; if (item != null) s.writeObject(item); @@ -839,11 +827,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue s.defaultReadObject(); // Read in elements until trailing null sentinel found - Node h = null, t = null; + Node h = null, t = null; Object item; while ((item = s.readObject()) != null) { @SuppressWarnings("unchecked") - Node newNode = new Node((QueuedPacket) item); + Node newNode = new Node((E) item); if (h == null) h = t = newNode; else { @@ -852,26 +840,26 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } } if (h == null) - h = t = new Node(null); + h = t = new Node(null); head = h; tail = t; } /** A customized variant of Spliterators.IteratorSpliterator */ - static final class CLQSpliterator implements Spliterator { + static final class CLQSpliterator implements Spliterator { static final int MAX_BATCH = 1 << 25; // max batch array size; - final CheckedConcurrentLinkedQueue queue; - Node current; // current node; null until initialized + final CheckedConcurrentLinkedQueue queue; + Node current; // current node; null until initialized int batch; // batch size for splits boolean exhausted; // true when no more nodes - CLQSpliterator(CheckedConcurrentLinkedQueue queue) { + CLQSpliterator(CheckedConcurrentLinkedQueue queue) { this.queue = queue; } @Override - public Spliterator trySplit() { - Node p; - final CheckedConcurrentLinkedQueue q = this.queue; + public Spliterator trySplit() { + Node p; + final CheckedConcurrentLinkedQueue q = this.queue; int b = batch; int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && @@ -898,15 +886,15 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } @Override - public void forEachRemaining(Consumer action) { - Node p; + public void forEachRemaining(Consumer action) { + Node p; if (action == null) throw new NullPointerException(); - final CheckedConcurrentLinkedQueue q = this.queue; + final CheckedConcurrentLinkedQueue q = this.queue; if (!exhausted && ((p = current) != null || (p = q.first()) != null)) { exhausted = true; do { - QueuedPacket e = p.item; + E e = p.item; if (p == (p = p.next)) p = q.first(); if (e != null) @@ -916,13 +904,13 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue } @Override - public boolean tryAdvance(Consumer action) { - Node p; + public boolean tryAdvance(Consumer action) { + Node p; if (action == null) throw new NullPointerException(); - final CheckedConcurrentLinkedQueue q = this.queue; + final CheckedConcurrentLinkedQueue q = this.queue; if (!exhausted && ((p = current) != null || (p = q.first()) != null)) { - QueuedPacket e; + E e; do { e = p.item; if (p == (p = p.next)) @@ -965,8 +953,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue * @since 1.8 */ @Override - public Spliterator spliterator() { - return new CLQSpliterator(this); + public Spliterator spliterator() { + return new CLQSpliterator(this); } /** @@ -979,11 +967,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue throw new NullPointerException(); } - private boolean casTail(Node cmp, Node val) { + private boolean casTail(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } - private boolean casHead(Node cmp, Node val) { + private boolean casHead(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } @@ -994,7 +982,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue private static final long tailOffset; static { try { - UNSAFE = Akari.UNSAFE; // Akarin + UNSAFE = Akari.UNSAFE; Class k = CheckedConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); diff --git a/sources/src/main/java/io/akarin/server/mixin/core/OptimisticNetworkManager.java b/sources/src/main/java/io/akarin/server/mixin/core/OptimisticNetworkManager.java index 129c99db4..01e9c343b 100644 --- a/sources/src/main/java/io/akarin/server/mixin/core/OptimisticNetworkManager.java +++ b/sources/src/main/java/io/akarin/server/mixin/core/OptimisticNetworkManager.java @@ -8,12 +8,16 @@ import org.spongepowered.asm.mixin.Mixin; import org.spongepowered.asm.mixin.Overwrite; import org.spongepowered.asm.mixin.Shadow; +import com.google.common.base.Predicate; + import io.akarin.api.CheckedConcurrentLinkedQueue; import io.netty.channel.Channel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import net.minecraft.server.NetworkManager; +import net.minecraft.server.NetworkManager.QueuedPacket; import net.minecraft.server.Packet; +import net.minecraft.server.PacketPlayOutMapChunk; @Mixin(value = NetworkManager.class, remap = false) public class OptimisticNetworkManager { @@ -24,6 +28,16 @@ public class OptimisticNetworkManager { @Shadow private Queue getPacketQueue() { return null; } @Shadow private void dispatchPacket(Packet packet, GenericFutureListener>[] genericFutureListeners) {} + private static final Predicate IS_CHUNK_READY = new Predicate() { + @Override + public boolean apply(QueuedPacket item) { + Packet packet = item.getPacket(); + return !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady()); // Check if the peeked packet is a chunk packet which is not ready + } + }; + + private static final QueuedPacket SIGNAL_PACKET = new QueuedPacket(null, null); + @Overwrite private boolean m() { if (this.channel != null && this.channel.isOpen()) { @@ -34,10 +48,10 @@ public class OptimisticNetworkManager { this.j.readLock().lock(); try { while (!this.i.isEmpty()) { - NetworkManager.QueuedPacket packet = ((CheckedConcurrentLinkedQueue) getPacketQueue()).checkedPoll(); + NetworkManager.QueuedPacket packet = ((CheckedConcurrentLinkedQueue) getPacketQueue()).poll(IS_CHUNK_READY, SIGNAL_PACKET); if (packet != null) { // Fix NPE (Spigot bug caused by handleDisconnection()) - if (packet == CheckedConcurrentLinkedQueue.emptyPacket) { // Check if the peeked packet is a chunk packet which is not ready + if (packet == SIGNAL_PACKET) { return false; // Return false if the peeked packet is a chunk packet which is not ready } else { dispatchPacket(packet.getPacket(), packet.getGenericFutureListeners()); // dispatch the packet diff --git a/sources/src/main/java/net/minecraft/server/NetworkManager.java b/sources/src/main/java/net/minecraft/server/NetworkManager.java index 8c71f7599..a67a2ac09 100644 --- a/sources/src/main/java/net/minecraft/server/NetworkManager.java +++ b/sources/src/main/java/net/minecraft/server/NetworkManager.java @@ -74,7 +74,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } }; private final EnumProtocolDirection h; - private final Queue i = new CheckedConcurrentLinkedQueue(); private final Queue getPacketQueue() { return this.i; } // Paper - Anti-Xray - OBFHELPER // Akarin + private final Queue i = new CheckedConcurrentLinkedQueue(); private final Queue getPacketQueue() { return this.i; } // Paper - Anti-Xray - OBFHELPER // Akarin private final ReentrantReadWriteLock j = new ReentrantReadWriteLock(); public Channel channel; // Spigot Start // PAIL