Fixes Akari timings w/ Make CheckConcurrentLinkedQueue common

This commit is contained in:
Sotr
2018-06-09 16:06:33 +08:00
parent 0771b4a367
commit 072923af35
4 changed files with 133 additions and 131 deletions

View File

@@ -44,7 +44,7 @@ public abstract class Akari {
/* /*
* The unsafe * The unsafe
*/ */
public static sun.misc.Unsafe UNSAFE = getUnsafe(); public final static sun.misc.Unsafe UNSAFE = getUnsafe();
private static sun.misc.Unsafe getUnsafe() { private static sun.misc.Unsafe getUnsafe() {
try { try {
@@ -60,15 +60,15 @@ public abstract class Akari {
/* /*
* Timings * Timings
*/ */
public static Timing worldTiming = getTiming("Akarin - World"); public final static Timing worldTiming = getTiming("Akarin - World");
public static Timing callbackTiming = getTiming("Akarin - Callback"); public final static Timing callbackTiming = getTiming("Akarin - Callback");
private static Timing getTiming(String name) { private static Timing getTiming(String name) {
try { try {
Method ofSafe = Timings.class.getDeclaredMethod("ofSafe", String.class); Method ofSafe = Timings.class.getDeclaredMethod("ofSafe", String.class);
ofSafe.setAccessible(true); ofSafe.setAccessible(true);
return worldTiming = (Timing) ofSafe.invoke(null, name); return (Timing) ofSafe.invoke(null, name);
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
return null; return null;

View File

@@ -35,8 +35,6 @@
package io.akarin.api; package io.akarin.api;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.AbstractQueue; import java.util.AbstractQueue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@@ -47,10 +45,6 @@ import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.function.Consumer; import java.util.function.Consumer;
import io.akarin.server.core.AkarinGlobalConfig;
import net.minecraft.server.PacketPlayOutMapChunk;
import net.minecraft.server.NetworkManager.QueuedPacket;
/** /**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
* This queue orders elements FIFO (first-in-first-out). * This queue orders elements FIFO (first-in-first-out).
@@ -106,12 +100,39 @@ import net.minecraft.server.NetworkManager.QueuedPacket;
* *
* @since 1.5 * @since 1.5
* @author Doug Lea * @author Doug Lea
* @param <QueuedPacket> the type of elements held in this collection * @param <E> the type of elements held in this collection
*/ */
public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket> public class CheckedConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<QueuedPacket>, java.io.Serializable { implements Queue<E>, java.io.Serializable {
private static final long serialVersionUID = 196745693267521676L; private static final long serialVersionUID = 196745693267521676L;
public E poll(com.google.common.base.Predicate<E> predicate, E singalInstance) {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (predicate.apply(item)) return singalInstance; // Test predicate
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
/* /*
* This is a modification of the Michael & Scott algorithm, * This is a modification of the Michael & Scott algorithm,
* adapted for a garbage-collected environment, with support for * adapted for a garbage-collected environment, with support for
@@ -215,7 +236,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
static { static {
try { try {
UNSAFE = Akari.UNSAFE; // Akarin UNSAFE = Akari.UNSAFE;
Class<?> k = Node.class; Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item")); (k.getDeclaredField("item"));
@@ -239,7 +260,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* - it is permitted for tail to lag behind head, that is, for tail * - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head! * to not be reachable from head!
*/ */
private transient volatile Node<QueuedPacket> head; private transient volatile Node<E> head;
/** /**
* A node from which the last node on list (that is, the unique * A node from which the last node on list (that is, the unique
@@ -253,13 +274,13 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* to not be reachable from head! * to not be reachable from head!
* - tail.next may or may not be self-pointing to tail. * - tail.next may or may not be self-pointing to tail.
*/ */
private transient volatile Node<QueuedPacket> tail; private transient volatile Node<E> tail;
/** /**
* Creates a {@code ConcurrentLinkedQueue} that is initially empty. * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/ */
public CheckedConcurrentLinkedQueue() { public CheckedConcurrentLinkedQueue() {
head = tail = new Node<QueuedPacket>(null); head = tail = new Node<E>(null);
} }
/** /**
@@ -271,11 +292,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* @throws NullPointerException if the specified collection or any * @throws NullPointerException if the specified collection or any
* of its elements are null * of its elements are null
*/ */
public CheckedConcurrentLinkedQueue(Collection<? extends QueuedPacket> c) { public CheckedConcurrentLinkedQueue(Collection<? extends E> c) {
Node<QueuedPacket> h = null, t = null; Node<E> h = null, t = null;
for (QueuedPacket e : c) { for (E e : c) {
checkNotNull(e); checkNotNull(e);
Node<QueuedPacket> newNode = new Node<QueuedPacket>(e); Node<E> newNode = new Node<E>(e);
if (h == null) if (h == null)
h = t = newNode; h = t = newNode;
else { else {
@@ -284,7 +305,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
} }
if (h == null) if (h == null)
h = t = new Node<QueuedPacket>(null); h = t = new Node<E>(null);
head = h; head = h;
tail = t; tail = t;
} }
@@ -300,7 +321,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
*/ */
@Override @Override
public boolean add(QueuedPacket e) { public boolean add(E e) {
return offer(e); return offer(e);
} }
@@ -308,7 +329,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* Tries to CAS head to p. If successful, repoint old head to itself * Tries to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below. * as sentinel for succ(), below.
*/ */
final void updateHead(Node<QueuedPacket> h, Node<QueuedPacket> p) { final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p)) if (h != p && casHead(h, p))
h.lazySetNext(h); h.lazySetNext(h);
} }
@@ -318,8 +339,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* linked to self, which will only be true if traversing with a * linked to self, which will only be true if traversing with a
* stale pointer that is now off the list. * stale pointer that is now off the list.
*/ */
final Node<QueuedPacket> succ(Node<QueuedPacket> p) { final Node<E> succ(Node<E> p) {
Node<QueuedPacket> next = p.next; Node<E> next = p.next;
return (p == next) ? head : next; return (p == next) ? head : next;
} }
@@ -331,12 +352,12 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
*/ */
@Override @Override
public boolean offer(QueuedPacket e) { public boolean offer(E e) {
checkNotNull(e); checkNotNull(e);
final Node<QueuedPacket> newNode = new Node<QueuedPacket>(e); final Node<E> newNode = new Node<E>(e);
for (Node<QueuedPacket> t = tail, p = t;;) { for (Node<E> t = tail, p = t;;) {
Node<QueuedPacket> q = p.next; Node<E> q = p.next;
if (q == null) { if (q == null) {
// p is last node // p is last node
if (p.casNext(null, newNode)) { if (p.casNext(null, newNode)) {
@@ -362,11 +383,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
@Override @Override
public QueuedPacket poll() { public E poll() {
restartFromHead: restartFromHead:
for (;;) { for (;;) {
for (Node<QueuedPacket> h = head, p = h, q;;) { for (Node<E> h = head, p = h, q;;) {
QueuedPacket item = p.item; E item = p.item;
if (item != null && p.casItem(item, null)) { if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point // Successful CAS is the linearization point
@@ -387,45 +408,12 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
} }
// Akarin start - Add checked poll
public static QueuedPacket emptyPacket = new QueuedPacket(null, null);
public QueuedPacket checkedPoll() {
restartFromHead:
for (;;) {
for (Node<QueuedPacket> h = head, p = h, q;;) {
QueuedPacket item = p.item;
//
if (item.getPacket() instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) item.getPacket()).isReady()) { // Check if the peeked packet is a chunk packet which is not ready
return emptyPacket; // Return false if the peeked packet is a chunk packet which is not ready
}
//
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
// Akarin end
@Override @Override
public QueuedPacket peek() { public E peek() {
restartFromHead: restartFromHead:
for (;;) { for (;;) {
for (Node<QueuedPacket> h = head, p = h, q;;) { for (Node<E> h = head, p = h, q;;) {
QueuedPacket item = p.item; E item = p.item;
if (item != null || (q = p.next) == null) { if (item != null || (q = p.next) == null) {
updateHead(h, p); updateHead(h, p);
return item; return item;
@@ -446,10 +434,10 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* and the need to add a retry loop to deal with the possibility * and the need to add a retry loop to deal with the possibility
* of losing a race to a concurrent poll(). * of losing a race to a concurrent poll().
*/ */
Node<QueuedPacket> first() { Node<E> first() {
restartFromHead: restartFromHead:
for (;;) { for (;;) {
for (Node<QueuedPacket> h = head, p = h, q;;) { for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null); boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) { if (hasItem || (q = p.next) == null) {
updateHead(h, p); updateHead(h, p);
@@ -492,7 +480,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
@Override @Override
public int size() { public int size() {
int count = 0; int count = 0;
for (Node<QueuedPacket> p = first(); p != null; p = succ(p)) for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null) if (p.item != null)
// Collection.size() spec says to max out // Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE) if (++count == Integer.MAX_VALUE)
@@ -511,8 +499,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
@Override @Override
public boolean contains(Object o) { public boolean contains(Object o) {
if (o == null) return false; if (o == null) return false;
for (Node<QueuedPacket> p = first(); p != null; p = succ(p)) { for (Node<E> p = first(); p != null; p = succ(p)) {
QueuedPacket item = p.item; E item = p.item;
if (item != null && o.equals(item)) if (item != null && o.equals(item))
return true; return true;
} }
@@ -533,10 +521,10 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
@Override @Override
public boolean remove(Object o) { public boolean remove(Object o) {
if (o != null) { if (o != null) {
Node<QueuedPacket> next, pred = null; Node<E> next, pred = null;
for (Node<QueuedPacket> p = first(); p != null; pred = p, p = next) { for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false; boolean removed = false;
QueuedPacket item = p.item; E item = p.item;
if (item != null) { if (item != null) {
if (!o.equals(item)) { if (!o.equals(item)) {
next = succ(p); next = succ(p);
@@ -568,16 +556,16 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* @throws IllegalArgumentException if the collection is this queue * @throws IllegalArgumentException if the collection is this queue
*/ */
@Override @Override
public boolean addAll(Collection<? extends QueuedPacket> c) { public boolean addAll(Collection<? extends E> c) {
if (c == this) if (c == this)
// As historically specified in AbstractQueue#addAll // As historically specified in AbstractQueue#addAll
throw new IllegalArgumentException(); throw new IllegalArgumentException();
// Copy c into a private chain of Nodes // Copy c into a private chain of Nodes
Node<QueuedPacket> beginningOfTheEnd = null, last = null; Node<E> beginningOfTheEnd = null, last = null;
for (QueuedPacket e : c) { for (E e : c) {
checkNotNull(e); checkNotNull(e);
Node<QueuedPacket> newNode = new Node<QueuedPacket>(e); Node<E> newNode = new Node<E>(e);
if (beginningOfTheEnd == null) if (beginningOfTheEnd == null)
beginningOfTheEnd = last = newNode; beginningOfTheEnd = last = newNode;
else { else {
@@ -589,8 +577,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
return false; return false;
// Atomically append the chain at the tail of this collection // Atomically append the chain at the tail of this collection
for (Node<QueuedPacket> t = tail, p = t;;) { for (Node<E> t = tail, p = t;;) {
Node<QueuedPacket> q = p.next; Node<E> q = p.next;
if (q == null) { if (q == null) {
// p is last node // p is last node
if (p.casNext(null, beginningOfTheEnd)) { if (p.casNext(null, beginningOfTheEnd)) {
@@ -635,9 +623,9 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
@Override @Override
public Object[] toArray() { public Object[] toArray() {
// Use ArrayList to deal with resizing. // Use ArrayList to deal with resizing.
ArrayList<QueuedPacket> al = new ArrayList<QueuedPacket>(); ArrayList<E> al = new ArrayList<E>();
for (Node<QueuedPacket> p = first(); p != null; p = succ(p)) { for (Node<E> p = first(); p != null; p = succ(p)) {
QueuedPacket item = p.item; E item = p.item;
if (item != null) if (item != null)
al.add(item); al.add(item);
} }
@@ -684,9 +672,9 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
public <T> T[] toArray(T[] a) { public <T> T[] toArray(T[] a) {
// try to use sent-in array // try to use sent-in array
int k = 0; int k = 0;
Node<QueuedPacket> p; Node<E> p;
for (p = first(); p != null && k < a.length; p = succ(p)) { for (p = first(); p != null && k < a.length; p = succ(p)) {
QueuedPacket item = p.item; E item = p.item;
if (item != null) if (item != null)
a[k++] = (T)item; a[k++] = (T)item;
} }
@@ -697,9 +685,9 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
// If won't fit, use ArrayList version // If won't fit, use ArrayList version
ArrayList<QueuedPacket> al = new ArrayList<QueuedPacket>(); ArrayList<E> al = new ArrayList<E>();
for (Node<QueuedPacket> q = first(); q != null; q = succ(q)) { for (Node<E> q = first(); q != null; q = succ(q)) {
QueuedPacket item = q.item; E item = q.item;
if (item != null) if (item != null)
al.add(item); al.add(item);
} }
@@ -716,15 +704,15 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* @return an iterator over the elements in this queue in proper sequence * @return an iterator over the elements in this queue in proper sequence
*/ */
@Override @Override
public Iterator<QueuedPacket> iterator() { public Iterator<E> iterator() {
return new Itr(); return new Itr();
} }
private class Itr implements Iterator<QueuedPacket> { private class Itr implements Iterator<E> {
/** /**
* Next node to return item for. * Next node to return item for.
*/ */
private Node<QueuedPacket> nextNode; private Node<E> nextNode;
/** /**
* nextItem holds on to item fields because once we claim * nextItem holds on to item fields because once we claim
@@ -732,12 +720,12 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* the following next() call even if it was in the process of * the following next() call even if it was in the process of
* being removed when hasNext() was called. * being removed when hasNext() was called.
*/ */
private QueuedPacket nextItem; private E nextItem;
/** /**
* Node of the last returned item, to support remove. * Node of the last returned item, to support remove.
*/ */
private Node<QueuedPacket> lastRet; private Node<E> lastRet;
Itr() { Itr() {
advance(); advance();
@@ -747,11 +735,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* Moves to next valid node and returns item to return for * Moves to next valid node and returns item to return for
* next(), or null if no such. * next(), or null if no such.
*/ */
private QueuedPacket advance() { private E advance() {
lastRet = nextNode; lastRet = nextNode;
QueuedPacket x = nextItem; E x = nextItem;
Node<QueuedPacket> pred, p; Node<E> pred, p;
if (nextNode == null) { if (nextNode == null) {
p = first(); p = first();
pred = null; pred = null;
@@ -766,14 +754,14 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
nextItem = null; nextItem = null;
return x; return x;
} }
QueuedPacket item = p.item; E item = p.item;
if (item != null) { if (item != null) {
nextNode = p; nextNode = p;
nextItem = item; nextItem = item;
return x; return x;
} else { } else {
// skip over nulls // skip over nulls
Node<QueuedPacket> next = succ(p); Node<E> next = succ(p);
if (pred != null && next != null) if (pred != null && next != null)
pred.casNext(p, next); pred.casNext(p, next);
p = next; p = next;
@@ -787,14 +775,14 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
@Override @Override
public QueuedPacket next() { public E next() {
if (nextNode == null) throw new NoSuchElementException(); if (nextNode == null) throw new NoSuchElementException();
return advance(); return advance();
} }
@Override @Override
public void remove() { public void remove() {
Node<QueuedPacket> l = lastRet; Node<E> l = lastRet;
if (l == null) throw new IllegalStateException(); if (l == null) throw new IllegalStateException();
// rely on a future traversal to relink. // rely on a future traversal to relink.
l.item = null; l.item = null;
@@ -817,7 +805,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
s.defaultWriteObject(); s.defaultWriteObject();
// Write out all elements in the proper order. // Write out all elements in the proper order.
for (Node<QueuedPacket> p = first(); p != null; p = succ(p)) { for (Node<E> p = first(); p != null; p = succ(p)) {
Object item = p.item; Object item = p.item;
if (item != null) if (item != null)
s.writeObject(item); s.writeObject(item);
@@ -839,11 +827,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
s.defaultReadObject(); s.defaultReadObject();
// Read in elements until trailing null sentinel found // Read in elements until trailing null sentinel found
Node<QueuedPacket> h = null, t = null; Node<E> h = null, t = null;
Object item; Object item;
while ((item = s.readObject()) != null) { while ((item = s.readObject()) != null) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Node<QueuedPacket> newNode = new Node<QueuedPacket>((QueuedPacket) item); Node<E> newNode = new Node<E>((E) item);
if (h == null) if (h == null)
h = t = newNode; h = t = newNode;
else { else {
@@ -852,26 +840,26 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
} }
if (h == null) if (h == null)
h = t = new Node<QueuedPacket>(null); h = t = new Node<E>(null);
head = h; head = h;
tail = t; tail = t;
} }
/** A customized variant of Spliterators.IteratorSpliterator */ /** A customized variant of Spliterators.IteratorSpliterator */
static final class CLQSpliterator implements Spliterator<QueuedPacket> { static final class CLQSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size; static final int MAX_BATCH = 1 << 25; // max batch array size;
final CheckedConcurrentLinkedQueue queue; final CheckedConcurrentLinkedQueue<E> queue;
Node<QueuedPacket> current; // current node; null until initialized Node<E> current; // current node; null until initialized
int batch; // batch size for splits int batch; // batch size for splits
boolean exhausted; // true when no more nodes boolean exhausted; // true when no more nodes
CLQSpliterator(CheckedConcurrentLinkedQueue queue) { CLQSpliterator(CheckedConcurrentLinkedQueue<E> queue) {
this.queue = queue; this.queue = queue;
} }
@Override @Override
public Spliterator<QueuedPacket> trySplit() { public Spliterator<E> trySplit() {
Node<QueuedPacket> p; Node<E> p;
final CheckedConcurrentLinkedQueue q = this.queue; final CheckedConcurrentLinkedQueue<E> q = this.queue;
int b = batch; int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted && if (!exhausted &&
@@ -898,15 +886,15 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
@Override @Override
public void forEachRemaining(Consumer<? super QueuedPacket> action) { public void forEachRemaining(Consumer<? super E> action) {
Node<QueuedPacket> p; Node<E> p;
if (action == null) throw new NullPointerException(); if (action == null) throw new NullPointerException();
final CheckedConcurrentLinkedQueue q = this.queue; final CheckedConcurrentLinkedQueue<E> q = this.queue;
if (!exhausted && if (!exhausted &&
((p = current) != null || (p = q.first()) != null)) { ((p = current) != null || (p = q.first()) != null)) {
exhausted = true; exhausted = true;
do { do {
QueuedPacket e = p.item; E e = p.item;
if (p == (p = p.next)) if (p == (p = p.next))
p = q.first(); p = q.first();
if (e != null) if (e != null)
@@ -916,13 +904,13 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
} }
@Override @Override
public boolean tryAdvance(Consumer<? super QueuedPacket> action) { public boolean tryAdvance(Consumer<? super E> action) {
Node<QueuedPacket> p; Node<E> p;
if (action == null) throw new NullPointerException(); if (action == null) throw new NullPointerException();
final CheckedConcurrentLinkedQueue q = this.queue; final CheckedConcurrentLinkedQueue<E> q = this.queue;
if (!exhausted && if (!exhausted &&
((p = current) != null || (p = q.first()) != null)) { ((p = current) != null || (p = q.first()) != null)) {
QueuedPacket e; E e;
do { do {
e = p.item; e = p.item;
if (p == (p = p.next)) if (p == (p = p.next))
@@ -965,8 +953,8 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
* @since 1.8 * @since 1.8
*/ */
@Override @Override
public Spliterator<QueuedPacket> spliterator() { public Spliterator<E> spliterator() {
return new CLQSpliterator(this); return new CLQSpliterator<E>(this);
} }
/** /**
@@ -979,11 +967,11 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
throw new NullPointerException(); throw new NullPointerException();
} }
private boolean casTail(Node<QueuedPacket> cmp, Node<QueuedPacket> val) { private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
} }
private boolean casHead(Node<QueuedPacket> cmp, Node<QueuedPacket> val) { private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
} }
@@ -994,7 +982,7 @@ public class CheckedConcurrentLinkedQueue extends AbstractQueue<QueuedPacket>
private static final long tailOffset; private static final long tailOffset;
static { static {
try { try {
UNSAFE = Akari.UNSAFE; // Akarin UNSAFE = Akari.UNSAFE;
Class<?> k = CheckedConcurrentLinkedQueue.class; Class<?> k = CheckedConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head")); (k.getDeclaredField("head"));

View File

@@ -8,12 +8,16 @@ import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Overwrite; import org.spongepowered.asm.mixin.Overwrite;
import org.spongepowered.asm.mixin.Shadow; import org.spongepowered.asm.mixin.Shadow;
import com.google.common.base.Predicate;
import io.akarin.api.CheckedConcurrentLinkedQueue; import io.akarin.api.CheckedConcurrentLinkedQueue;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import net.minecraft.server.NetworkManager; import net.minecraft.server.NetworkManager;
import net.minecraft.server.NetworkManager.QueuedPacket;
import net.minecraft.server.Packet; import net.minecraft.server.Packet;
import net.minecraft.server.PacketPlayOutMapChunk;
@Mixin(value = NetworkManager.class, remap = false) @Mixin(value = NetworkManager.class, remap = false)
public class OptimisticNetworkManager { public class OptimisticNetworkManager {
@@ -24,6 +28,16 @@ public class OptimisticNetworkManager {
@Shadow private Queue<NetworkManager.QueuedPacket> getPacketQueue() { return null; } @Shadow private Queue<NetworkManager.QueuedPacket> getPacketQueue() { return null; }
@Shadow private void dispatchPacket(Packet<?> packet, GenericFutureListener<? extends Future<? super Void>>[] genericFutureListeners) {} @Shadow private void dispatchPacket(Packet<?> packet, GenericFutureListener<? extends Future<? super Void>>[] genericFutureListeners) {}
private static final Predicate<QueuedPacket> IS_CHUNK_READY = new Predicate<QueuedPacket>() {
@Override
public boolean apply(QueuedPacket item) {
Packet<?> packet = item.getPacket();
return !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady()); // Check if the peeked packet is a chunk packet which is not ready
}
};
private static final QueuedPacket SIGNAL_PACKET = new QueuedPacket(null, null);
@Overwrite @Overwrite
private boolean m() { private boolean m() {
if (this.channel != null && this.channel.isOpen()) { if (this.channel != null && this.channel.isOpen()) {
@@ -34,10 +48,10 @@ public class OptimisticNetworkManager {
this.j.readLock().lock(); this.j.readLock().lock();
try { try {
while (!this.i.isEmpty()) { while (!this.i.isEmpty()) {
NetworkManager.QueuedPacket packet = ((CheckedConcurrentLinkedQueue) getPacketQueue()).checkedPoll(); NetworkManager.QueuedPacket packet = ((CheckedConcurrentLinkedQueue<QueuedPacket>) getPacketQueue()).poll(IS_CHUNK_READY, SIGNAL_PACKET);
if (packet != null) { // Fix NPE (Spigot bug caused by handleDisconnection()) if (packet != null) { // Fix NPE (Spigot bug caused by handleDisconnection())
if (packet == CheckedConcurrentLinkedQueue.emptyPacket) { // Check if the peeked packet is a chunk packet which is not ready if (packet == SIGNAL_PACKET) {
return false; // Return false if the peeked packet is a chunk packet which is not ready return false; // Return false if the peeked packet is a chunk packet which is not ready
} else { } else {
dispatchPacket(packet.getPacket(), packet.getGenericFutureListeners()); // dispatch the packet dispatchPacket(packet.getPacket(), packet.getGenericFutureListeners()); // dispatch the packet

View File

@@ -74,7 +74,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
} }
}; };
private final EnumProtocolDirection h; private final EnumProtocolDirection h;
private final Queue<NetworkManager.QueuedPacket> i = new CheckedConcurrentLinkedQueue(); private final Queue<NetworkManager.QueuedPacket> getPacketQueue() { return this.i; } // Paper - Anti-Xray - OBFHELPER // Akarin private final Queue<NetworkManager.QueuedPacket> i = new CheckedConcurrentLinkedQueue<NetworkManager.QueuedPacket>(); private final Queue<NetworkManager.QueuedPacket> getPacketQueue() { return this.i; } // Paper - Anti-Xray - OBFHELPER // Akarin
private final ReentrantReadWriteLock j = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock j = new ReentrantReadWriteLock();
public Channel channel; public Channel channel;
// Spigot Start // PAIL // Spigot Start // PAIL