Sync chunk system changes from Paper
Additionally, slightly increase parallelism of ticket level propagation
This commit is contained in:
@@ -107,4 +107,9 @@ public abstract class SortedArraySetMixin<T> extends AbstractSet<T> implements C
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] moonrise$copyBackingArray() {
|
||||
return this.contents.clone();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -824,34 +824,6 @@ public final class RegionFileIOThread extends PrioritisedQueueExecutorThread {
|
||||
return thread.loadDataAsyncInternal(world, chunkX, chunkZ, type, onComplete, intendingToBlock, priority);
|
||||
}
|
||||
|
||||
private static Boolean doesRegionFileExist(final int chunkX, final int chunkZ, final boolean intendingToBlock,
|
||||
final ChunkDataController taskController) {
|
||||
final ChunkPos chunkPos = new ChunkPos(chunkX, chunkZ);
|
||||
if (intendingToBlock) {
|
||||
return taskController.computeForRegionFile(chunkX, chunkZ, true, (final RegionFile file) -> {
|
||||
if (file == null) { // null if no regionfile exists
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
return file.hasChunk(chunkPos) ? Boolean.TRUE : Boolean.FALSE;
|
||||
});
|
||||
} else {
|
||||
// first check if the region file for sure does not exist
|
||||
if (taskController.doesRegionFileNotExist(chunkX, chunkZ)) {
|
||||
return Boolean.FALSE;
|
||||
} // else: it either exists or is not known, fall back to checking the loaded region file
|
||||
|
||||
return taskController.computeForRegionFileIfLoaded(chunkX, chunkZ, (final RegionFile file) -> {
|
||||
if (file == null) { // null if not loaded
|
||||
// not sure at this point, let the I/O thread figure it out
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
return file.hasChunk(chunkPos) ? Boolean.TRUE : Boolean.FALSE;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Cancellable loadDataAsyncInternal(final ServerLevel world, final int chunkX, final int chunkZ,
|
||||
final RegionFileType type, final BiConsumer<CompoundTag, Throwable> onComplete,
|
||||
final boolean intendingToBlock, final Priority priority) {
|
||||
@@ -864,20 +836,6 @@ public final class RegionFileIOThread extends PrioritisedQueueExecutorThread {
|
||||
if (running == null) {
|
||||
// not scheduled
|
||||
|
||||
if (callbackInfo.regionFileCalculation == null) {
|
||||
// caller will compute this outside of compute(), to avoid holding the bin lock
|
||||
callbackInfo.needsRegionFileTest = true;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (callbackInfo.regionFileCalculation == Boolean.FALSE) {
|
||||
// not on disk
|
||||
callbackInfo.data = null;
|
||||
callbackInfo.throwable = null;
|
||||
callbackInfo.completeNow = true;
|
||||
return null;
|
||||
}
|
||||
|
||||
// set up task
|
||||
final ChunkDataTask newTask = new ChunkDataTask(
|
||||
world, chunkX, chunkZ, taskController, RegionFileIOThread.this, priority
|
||||
@@ -908,17 +866,7 @@ public final class RegionFileIOThread extends PrioritisedQueueExecutorThread {
|
||||
return running;
|
||||
};
|
||||
|
||||
ChunkDataTask curr = taskController.tasks.get(key);
|
||||
if (curr == null) {
|
||||
callbackInfo.regionFileCalculation = doesRegionFileExist(chunkX, chunkZ, intendingToBlock, taskController);
|
||||
}
|
||||
ChunkDataTask ret = taskController.tasks.compute(key, compute);
|
||||
if (callbackInfo.needsRegionFileTest) {
|
||||
// curr isn't null but when we went into compute() it was
|
||||
callbackInfo.regionFileCalculation = doesRegionFileExist(chunkX, chunkZ, intendingToBlock, taskController);
|
||||
// now it should be fine
|
||||
ret = taskController.tasks.compute(key, compute);
|
||||
}
|
||||
final ChunkDataTask ret = taskController.tasks.compute(key, compute);
|
||||
|
||||
// needs to be scheduled
|
||||
if (callbackInfo.tasksNeedsScheduling) {
|
||||
@@ -975,8 +923,6 @@ public final class RegionFileIOThread extends PrioritisedQueueExecutorThread {
|
||||
public Throwable throwable;
|
||||
public boolean completeNow;
|
||||
public boolean tasksNeedsScheduling;
|
||||
public boolean needsRegionFileTest;
|
||||
public Boolean regionFileCalculation;
|
||||
|
||||
}
|
||||
|
||||
@@ -1043,7 +989,7 @@ public final class RegionFileIOThread extends PrioritisedQueueExecutorThread {
|
||||
|
||||
private CompoundTag value;
|
||||
private Throwable throwable;
|
||||
private MultiThreadedQueue<BiConsumer<CompoundTag, Throwable>> callbacks = new MultiThreadedQueue<>();
|
||||
private final MultiThreadedQueue<BiConsumer<CompoundTag, Throwable>> callbacks = new MultiThreadedQueue<>();
|
||||
|
||||
public boolean hasNoWaiters() {
|
||||
return this.callbacks.isEmpty();
|
||||
|
||||
@@ -48,6 +48,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@@ -1361,40 +1362,13 @@ public final class ChunkHolderManager {
|
||||
holders.add(holder.getDebugJson());
|
||||
}
|
||||
|
||||
/* TODO
|
||||
final JsonArray removeTickToChunkExpireTicketCount = new JsonArray();
|
||||
ret.add("remove_tick_to_chunk_expire_ticket_count", removeTickToChunkExpireTicketCount);
|
||||
|
||||
for (final Long2ObjectMap.Entry<Long2IntOpenHashMap> tickEntry : this.removeTickToChunkExpireTicketCount.long2ObjectEntrySet()) {
|
||||
final long tick = tickEntry.getLongKey();
|
||||
final Long2IntOpenHashMap coordinateToCount = tickEntry.getValue();
|
||||
|
||||
final JsonObject tickJson = new JsonObject();
|
||||
removeTickToChunkExpireTicketCount.add(tickJson);
|
||||
|
||||
tickJson.addProperty("tick", Long.valueOf(tick));
|
||||
|
||||
final JsonArray tickEntries = new JsonArray();
|
||||
tickJson.add("entries", tickEntries);
|
||||
|
||||
for (final Long2IntMap.Entry entry : coordinateToCount.long2IntEntrySet()) {
|
||||
final long coordinate = entry.getLongKey();
|
||||
final int count = entry.getIntValue();
|
||||
|
||||
final JsonObject entryJson = new JsonObject();
|
||||
tickEntries.add(entryJson);
|
||||
|
||||
entryJson.addProperty("chunkX", Long.valueOf(CoordinateUtils.getChunkX(coordinate)));
|
||||
entryJson.addProperty("chunkZ", Long.valueOf(CoordinateUtils.getChunkZ(coordinate)));
|
||||
entryJson.addProperty("count", Integer.valueOf(count));
|
||||
}
|
||||
}
|
||||
|
||||
final JsonArray allTicketsJson = new JsonArray();
|
||||
ret.add("tickets", allTicketsJson);
|
||||
|
||||
for (final Long2ObjectMap.Entry<SortedArraySet<Ticket<?>>> coordinateTickets : this.tickets.long2ObjectEntrySet()) {
|
||||
final long coordinate = coordinateTickets.getLongKey();
|
||||
for (final Iterator<ConcurrentLong2ReferenceChainedHashTable.TableEntry<SortedArraySet<Ticket<?>>>> iterator = this.tickets.entryIterator();
|
||||
iterator.hasNext();) {
|
||||
final ConcurrentLong2ReferenceChainedHashTable.TableEntry<SortedArraySet<Ticket<?>>> coordinateTickets = iterator.next();
|
||||
final long coordinate = coordinateTickets.getKey();
|
||||
final SortedArraySet<Ticket<?>> tickets = coordinateTickets.getValue();
|
||||
|
||||
final JsonObject coordinateJson = new JsonObject();
|
||||
@@ -1406,17 +1380,24 @@ public final class ChunkHolderManager {
|
||||
final JsonArray ticketsSerialized = new JsonArray();
|
||||
coordinateJson.add("tickets", ticketsSerialized);
|
||||
|
||||
for (final Ticket<?> ticket : tickets) {
|
||||
// note: by using a copy of the backing array, we can avoid explicit exceptions we may trip when iterating
|
||||
// directly over the set using the iterator
|
||||
// however, it also means we need to null-check the values, and there is a possibility that we _miss_ an
|
||||
// entry OR iterate over an entry multiple times
|
||||
for (final Object ticketUncasted : ((ChunkSystemSortedArraySet<Ticket<?>>)tickets).moonrise$copyBackingArray()) {
|
||||
if (ticketUncasted == null) {
|
||||
continue;
|
||||
}
|
||||
final Ticket<?> ticket = (Ticket<?>)ticketUncasted;
|
||||
final JsonObject ticketSerialized = new JsonObject();
|
||||
ticketsSerialized.add(ticketSerialized);
|
||||
|
||||
ticketSerialized.addProperty("type", ticket.getType().toString());
|
||||
ticketSerialized.addProperty("level", Integer.valueOf(ticket.getTicketLevel()));
|
||||
ticketSerialized.addProperty("identifier", Objects.toString(ticket.key));
|
||||
ticketSerialized.addProperty("remove_tick", Long.valueOf(ticket.removalTick));
|
||||
ticketSerialized.addProperty("remove_tick", Long.valueOf(((ChunkSystemTicket<?>)(Object)ticket).moonrise$getRemoveDelay()));
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -212,21 +212,17 @@ public final class ChunkTaskScheduler {
|
||||
}
|
||||
|
||||
private static final int[] ACCESS_RADIUS_TABLE = new int[ChunkStatus.getStatusList().size()];
|
||||
private static final int[] MAX_ACCESS_RADIUS_TABLE = new int[ACCESS_RADIUS_TABLE.length];
|
||||
static {
|
||||
Arrays.fill(ACCESS_RADIUS_TABLE, -1);
|
||||
}
|
||||
|
||||
private static int getAccessRadius0(final ChunkStatus genStatus) {
|
||||
if (genStatus == ChunkStatus.EMPTY) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
final int radius = Math.max(((ChunkSystemChunkStatus)genStatus).moonrise$getLoadRadius(), genStatus.getRange());
|
||||
final int radius = genStatus.getRange();
|
||||
int maxRange = radius;
|
||||
|
||||
for (int dist = 1; dist <= radius; ++dist) {
|
||||
final ChunkStatus requiredNeighbourStatus = ChunkStatus.getStatusAroundFullChunk(ChunkStatus.getDistance(genStatus) + dist);
|
||||
for (int dist = 0; dist <= radius; ++dist) {
|
||||
final ChunkStatus requiredNeighbourStatus = dist == 0 ? genStatus.getParent() : ChunkStatus.getStatusAroundFullChunk(ChunkStatus.getDistance(genStatus) + dist);
|
||||
final int rad = ACCESS_RADIUS_TABLE[requiredNeighbourStatus.getIndex()];
|
||||
if (rad == -1) {
|
||||
throw new IllegalStateException();
|
||||
@@ -238,22 +234,18 @@ public final class ChunkTaskScheduler {
|
||||
return maxRange;
|
||||
}
|
||||
|
||||
private static int maxAccessRadius;
|
||||
private static final int MAX_ACCESS_RADIUS;
|
||||
|
||||
static {
|
||||
final List<ChunkStatus> statuses = ChunkStatus.getStatusList();
|
||||
for (int i = 0, len = statuses.size(); i < len; ++i) {
|
||||
ACCESS_RADIUS_TABLE[i] = getAccessRadius0(statuses.get(i));
|
||||
}
|
||||
int max = 0;
|
||||
for (int i = 0, len = statuses.size(); i < len; ++i) {
|
||||
MAX_ACCESS_RADIUS_TABLE[i] = max = Math.max(ACCESS_RADIUS_TABLE[i], max);
|
||||
}
|
||||
maxAccessRadius = max;
|
||||
MAX_ACCESS_RADIUS = ACCESS_RADIUS_TABLE[ACCESS_RADIUS_TABLE.length - 1];
|
||||
}
|
||||
|
||||
public static int getMaxAccessRadius() {
|
||||
return maxAccessRadius;
|
||||
return MAX_ACCESS_RADIUS;
|
||||
}
|
||||
|
||||
public static int getAccessRadius(final ChunkStatus genStatus) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import it.unimi.dsi.fastutil.shorts.Short2ByteMap;
|
||||
import it.unimi.dsi.fastutil.shorts.ShortOpenHashSet;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -29,6 +30,10 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
// a 1 level to 0; and if a source was 63 then it may cross more than 2 sections in de-propagation
|
||||
private static final int MAX_SOURCE_LEVEL = 62;
|
||||
|
||||
private static int getMaxSchedulingRadius() {
|
||||
return 2 * ChunkTaskScheduler.getMaxAccessRadius();
|
||||
}
|
||||
|
||||
private final UpdateQueue updateQueue;
|
||||
private final ConcurrentLong2ReferenceChainedHashTable<Section> sections;
|
||||
|
||||
@@ -305,7 +310,7 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
|
||||
if (!propagator.updatedPositions.isEmpty()) {
|
||||
// now we can actually update the ticket levels in the chunk holders
|
||||
final int maxScheduleRadius = 2 * ChunkTaskScheduler.getMaxAccessRadius();
|
||||
final int maxScheduleRadius = getMaxSchedulingRadius();
|
||||
|
||||
// allow the chunkholders to process ticket level updates without needing to acquire the schedule lock every time
|
||||
final ReentrantAreaLock.Node schedulingNode = schedulingLock.lock(
|
||||
@@ -347,10 +352,8 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
Propagator propagator = null;
|
||||
|
||||
for (;;) {
|
||||
final UpdateQueue.UpdateQueueNode toUpdate = this.updateQueue.acquireNextToUpdate(maxOrder);
|
||||
final UpdateQueue.UpdateQueueNode toUpdate = this.updateQueue.acquireNextOrWait(maxOrder);
|
||||
if (toUpdate == null) {
|
||||
this.updateQueue.awaitFirst(maxOrder);
|
||||
|
||||
if (!this.updateQueue.hasRemainingUpdates(maxOrder)) {
|
||||
if (propagator != null) {
|
||||
Propagator.returnPropagator(propagator);
|
||||
@@ -375,11 +378,9 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
|
||||
private volatile UpdateQueueNode head;
|
||||
private volatile UpdateQueueNode tail;
|
||||
private volatile UpdateQueueNode lastUpdating;
|
||||
|
||||
private static final VarHandle HEAD_HANDLE = ConcurrentUtil.getVarHandle(UpdateQueue.class, "head", UpdateQueueNode.class);
|
||||
private static final VarHandle TAIL_HANDLE = ConcurrentUtil.getVarHandle(UpdateQueue.class, "tail", UpdateQueueNode.class);
|
||||
private static final VarHandle LAST_UPDATING = ConcurrentUtil.getVarHandle(UpdateQueue.class, "lastUpdating", UpdateQueueNode.class);
|
||||
|
||||
/* head */
|
||||
|
||||
@@ -421,16 +422,6 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
return (UpdateQueueNode)TAIL_HANDLE.getOpaque(this);
|
||||
}
|
||||
|
||||
/* lastUpdating */
|
||||
|
||||
private final UpdateQueueNode getLastUpdatingVolatile() {
|
||||
return (UpdateQueueNode)LAST_UPDATING.getVolatile(this);
|
||||
}
|
||||
|
||||
private final UpdateQueueNode compareAndExchangeLastUpdatingVolatile(final UpdateQueueNode expect, final UpdateQueueNode update) {
|
||||
return (UpdateQueueNode)LAST_UPDATING.compareAndExchange(this, expect, update);
|
||||
}
|
||||
|
||||
public UpdateQueue() {
|
||||
final UpdateQueueNode dummy = new UpdateQueueNode(null, null);
|
||||
dummy.order = -1L;
|
||||
@@ -463,46 +454,57 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
}
|
||||
}
|
||||
|
||||
public UpdateQueueNode acquireNextToUpdate(final long maxOrder) {
|
||||
int failures = 0;
|
||||
for (UpdateQueueNode prev = this.getLastUpdatingVolatile();;) {
|
||||
UpdateQueueNode next = prev == null ? this.peek() : prev.next;
|
||||
|
||||
if (next == null || next.order > maxOrder) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (int i = 0; i < failures; ++i) {
|
||||
ConcurrentUtil.backoff();
|
||||
}
|
||||
|
||||
if (prev == (prev = this.compareAndExchangeLastUpdatingVolatile(prev, next))) {
|
||||
return next;
|
||||
}
|
||||
|
||||
++failures;
|
||||
}
|
||||
}
|
||||
|
||||
public void awaitFirst(final long maxOrder) {
|
||||
final UpdateQueueNode earliest = this.peek();
|
||||
if (earliest == null || earliest.order > maxOrder) {
|
||||
return;
|
||||
}
|
||||
|
||||
private static void await(final UpdateQueueNode node) {
|
||||
final Thread currThread = Thread.currentThread();
|
||||
// we do not use add-blocking because we use the nullability of the section to block
|
||||
// remove() does not begin to poll from the wait queue until the section is null'd,
|
||||
// and so provided we check the nullability before parking there is no ordering of these operations
|
||||
// such that remove() finishes polling from the wait queue while section is not null
|
||||
earliest.add(currThread);
|
||||
node.add(currThread);
|
||||
|
||||
// wait until completed
|
||||
while (earliest.getSectionVolatile() != null) {
|
||||
while (node.getSectionVolatile() != null) {
|
||||
LockSupport.park();
|
||||
}
|
||||
}
|
||||
|
||||
public UpdateQueueNode acquireNextOrWait(final long maxOrder) {
|
||||
final List<UpdateQueueNode> blocking = new ArrayList<>();
|
||||
|
||||
node_search:
|
||||
for (UpdateQueueNode curr = this.peek(); curr != null && curr.order <= maxOrder; curr = curr.getNextVolatile()) {
|
||||
if (curr.getSectionVolatile() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (curr.getUpdatingVolatile()) {
|
||||
blocking.add(curr);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int i = 0, len = blocking.size(); i < len; ++i) {
|
||||
final UpdateQueueNode node = blocking.get(i);
|
||||
|
||||
if (node.intersects(curr)) {
|
||||
continue node_search;
|
||||
}
|
||||
}
|
||||
|
||||
if (curr.getAndSetUpdatingVolatile(true)) {
|
||||
blocking.add(curr);
|
||||
continue;
|
||||
}
|
||||
|
||||
return curr;
|
||||
}
|
||||
|
||||
if (!blocking.isEmpty()) {
|
||||
await(blocking.get(0));
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public UpdateQueueNode peek() {
|
||||
for (UpdateQueueNode head = this.getHeadOpaque(), curr = head;;) {
|
||||
final UpdateQueueNode next = curr.getNextVolatile();
|
||||
@@ -587,18 +589,37 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
// each node also represents a set of waiters, represented by the MTQ
|
||||
// if the queue is add-blocked, then the update is complete
|
||||
private static final class UpdateQueueNode extends MultiThreadedQueue<Thread> {
|
||||
private final int sectionX;
|
||||
private final int sectionZ;
|
||||
|
||||
private long order;
|
||||
private Section section;
|
||||
private volatile Section section;
|
||||
private volatile UpdateQueueNode next;
|
||||
private volatile boolean updating;
|
||||
|
||||
private static final VarHandle SECTION_HANDLE = ConcurrentUtil.getVarHandle(UpdateQueueNode.class, "section", Section.class);
|
||||
private static final VarHandle NEXT_HANDLE = ConcurrentUtil.getVarHandle(UpdateQueueNode.class, "next", UpdateQueueNode.class);
|
||||
private static final VarHandle UPDATING_HANDLE = ConcurrentUtil.getVarHandle(UpdateQueueNode.class, "updating", boolean.class);
|
||||
|
||||
public UpdateQueueNode(final Section section, final UpdateQueueNode next) {
|
||||
if (section == null) {
|
||||
this.sectionX = this.sectionZ = 0;
|
||||
} else {
|
||||
this.sectionX = section.sectionX;
|
||||
this.sectionZ = section.sectionZ;
|
||||
}
|
||||
|
||||
SECTION_HANDLE.set(this, section);
|
||||
NEXT_HANDLE.set(this, next);
|
||||
}
|
||||
|
||||
public boolean intersects(final UpdateQueueNode other) {
|
||||
final int dist = Math.max(Math.abs(this.sectionX - other.sectionX), Math.abs(this.sectionZ - other.sectionZ));
|
||||
|
||||
// intersection radius is ticket update radius (1) + scheduling radius
|
||||
return dist <= (1 + ((getMaxSchedulingRadius() + (SECTION_SIZE - 1)) >> SECTION_SHIFT));
|
||||
}
|
||||
|
||||
/* section */
|
||||
|
||||
private final Section getSectionPlain() {
|
||||
@@ -658,6 +679,16 @@ public abstract class ThreadedTicketLevelPropagator {
|
||||
private final UpdateQueueNode compareExchangeNextVolatile(final UpdateQueueNode expect, final UpdateQueueNode set) {
|
||||
return (UpdateQueueNode)NEXT_HANDLE.compareAndExchange(this, expect, set);
|
||||
}
|
||||
|
||||
/* updating */
|
||||
|
||||
private final boolean getUpdatingVolatile() {
|
||||
return (boolean)UPDATING_HANDLE.getVolatile(this);
|
||||
}
|
||||
|
||||
private final boolean getAndSetUpdatingVolatile(final boolean value) {
|
||||
return (boolean)UPDATING_HANDLE.getAndSet(this, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@ public interface ChunkSystemSortedArraySet<T> {
|
||||
|
||||
public SortedArraySet<T> moonrise$copy();
|
||||
|
||||
public Object[] moonrise$copyBackingArray();
|
||||
|
||||
public T moonrise$replace(final T object);
|
||||
|
||||
public T moonrise$removeAndGet(final T object);
|
||||
|
||||
Reference in New Issue
Block a user