diff --git a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java index e4ff355..9d9315b 100644 --- a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java +++ b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java @@ -1,5 +1,6 @@ package ca.spottedleaf.moonrise.patches.chunk_system.scheduling; +import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue; import ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock; import ca.spottedleaf.concurrentutil.map.ConcurrentLong2ReferenceChainedHashTable; import ca.spottedleaf.concurrentutil.util.Priority; @@ -81,6 +82,7 @@ public final class ChunkHolderManager { private long currentTick; private final ArrayDeque pendingFullLoadUpdate = new ArrayDeque<>(); + private final MultiThreadedQueue offThreadPendingFullLoadUpdate = new MultiThreadedQueue<>(); private final ObjectRBTreeSet autoSaveQueue = new ObjectRBTreeSet<>((final NewChunkHolder c1, final NewChunkHolder c2) -> { if (c1 == c2) { return 0; @@ -109,20 +111,20 @@ public final class ChunkHolderManager { this.unloadQueue = new ChunkUnloadQueue(((ChunkSystemServerLevel)world).moonrise$getRegionChunkShift()); } - public boolean processTicketUpdates(final int posX, final int posZ) { + public boolean processTicketUpdates(final int chunkX, final int chunkZ) { final int ticketShift = ThreadedTicketLevelPropagator.SECTION_SHIFT; final int ticketMask = (1 << ticketShift) - 1; final List scheduledTasks = new ArrayList<>(); final List changedFullStatus = new ArrayList<>(); final boolean ret; final ReentrantAreaLock.Node ticketLock = this.ticketLockArea.lock( - ((posX >> ticketShift) - 1) << ticketShift, - ((posZ >> ticketShift) - 1) << ticketShift, - (((posX >> ticketShift) + 1) << ticketShift) | ticketMask, - (((posZ >> ticketShift) + 1) << ticketShift) | ticketMask + ((chunkX >> ticketShift) - 1) << ticketShift, + ((chunkZ >> ticketShift) - 1) << ticketShift, + (((chunkX >> ticketShift) + 1) << ticketShift) | ticketMask, + (((chunkZ >> ticketShift) + 1) << ticketShift) | ticketMask ); try { - ret = this.processTicketUpdatesNoLock(posX >> ticketShift, posZ >> ticketShift, scheduledTasks, changedFullStatus); + ret = this.processTicketUpdatesNoLock(chunkX >> ticketShift, chunkZ >> ticketShift, scheduledTasks, changedFullStatus); } finally { this.ticketLockArea.unlock(ticketLock); } @@ -720,6 +722,9 @@ public final class ChunkHolderManager { return removeDelay <= 0L; }; + final List scheduledTasks = new ArrayList<>(); + final List changedFullStatus = new ArrayList<>(); + for (final PrimitiveIterator.OfLong iterator = this.sectionToChunkToExpireCount.keyIterator(); iterator.hasNext();) { final long sectionKey = iterator.nextLong(); @@ -728,9 +733,16 @@ public final class ChunkHolderManager { continue; } + final int lowerChunkX = CoordinateUtils.getChunkX(sectionKey) << sectionShift; + final int lowerChunkZ = CoordinateUtils.getChunkZ(sectionKey) << sectionShift; + + final int ticketShift = ThreadedTicketLevelPropagator.SECTION_SHIFT; + final int ticketMask = (1 << ticketShift) - 1; final ReentrantAreaLock.Node ticketLock = this.ticketLockArea.lock( - CoordinateUtils.getChunkX(sectionKey) << sectionShift, - CoordinateUtils.getChunkZ(sectionKey) << sectionShift + ((lowerChunkX >> ticketShift) - 1) << ticketShift, + ((lowerChunkZ >> ticketShift) - 1) << ticketShift, + (((lowerChunkX >> ticketShift) + 1) << ticketShift) | ticketMask, + (((lowerChunkZ >> ticketShift) + 1) << ticketShift) | ticketMask ); try { @@ -777,9 +789,23 @@ public final class ChunkHolderManager { if (chunkToExpireCount.isEmpty()) { this.sectionToChunkToExpireCount.remove(sectionKey); } + + // In order to prevent a race condition where an off-thread invokes processTicketUpdates(), we need to process ticket updates here + // so that we catch any additions to the changed full status list. If an off-thread were to process tickets here, it would not be guaranteed + // that it would be added to the full changed status set by the end of the call - possibly allowing ticket level decreases to be processed + // outside of this call, which is not an intended or expected of this chunk system. + this.processTicketUpdatesNoLock(lowerChunkX >> ThreadedTicketLevelPropagator.SECTION_SHIFT, lowerChunkZ >> ThreadedTicketLevelPropagator.SECTION_SHIFT, scheduledTasks, changedFullStatus); } finally { this.ticketLockArea.unlock(ticketLock); } + + this.addChangedStatuses(changedFullStatus); + changedFullStatus.clear(); // clear for next loop iteration + + for (int i = 0, len = scheduledTasks.size(); i < len; ++i) { + scheduledTasks.get(i).schedule(); + } + scheduledTasks.clear(); // clear for next loop iteration } this.processTicketUpdates(); @@ -1006,14 +1032,9 @@ public final class ChunkHolderManager { return; } if (!TickThread.isTickThread()) { - this.taskScheduler.scheduleChunkTask(() -> { - final ArrayDeque pendingFullLoadUpdate = ChunkHolderManager.this.pendingFullLoadUpdate; - for (int i = 0, len = changedFullStatus.size(); i < len; ++i) { - pendingFullLoadUpdate.add(changedFullStatus.get(i)); - } - - ChunkHolderManager.this.processPendingFullUpdate(); - }, Priority.HIGHEST); + // These will be handled on the next ServerChunkCache$MainThreadExecutor#pollTask, as it runs the distance manager update + // which will invoke processTicketUpdates + this.offThreadPendingFullLoadUpdate.addAll(changedFullStatus); } else { final ArrayDeque pendingFullLoadUpdate = this.pendingFullLoadUpdate; for (int i = 0, len = changedFullStatus.size(); i < len; ++i) { @@ -1294,36 +1315,20 @@ public final class ChunkHolderManager { } public boolean processTicketUpdates() { - return this.processTicketUpdates(true, null); - } - - private static final ThreadLocal> CURRENT_TICKET_UPDATE_SCHEDULING = new ThreadLocal<>(); - - static List getCurrentTicketUpdateScheduling() { - return CURRENT_TICKET_UPDATE_SCHEDULING.get(); - } - - private boolean processTicketUpdates(final boolean processFullUpdates, List scheduledTasks) { if (BLOCK_TICKET_UPDATES.get() == Boolean.TRUE) { throw new IllegalStateException("Cannot update ticket level while unloading chunks or updating entity manager"); } - if (!PlatformHooks.get().allowAsyncTicketUpdates() && !TickThread.isTickThread()) { + final boolean isTickThread = TickThread.isTickThread(); + + if (!PlatformHooks.get().allowAsyncTicketUpdates() && isTickThread) { TickThread.ensureTickThread("Cannot asynchronously process ticket updates"); } - List changedFullStatus = null; - - final boolean isTickThread = TickThread.isTickThread(); - boolean ret = false; - final boolean canProcessFullUpdates = processFullUpdates & isTickThread; - final boolean canProcessScheduling = scheduledTasks == null; if (this.ticketLevelPropagator.hasPendingUpdates()) { - if (scheduledTasks == null) { - scheduledTasks = new ArrayList<>(); - } - changedFullStatus = new ArrayList<>(); + final List scheduledTasks = new ArrayList<>(); + final List changedFullStatus = new ArrayList<>(); this.blockTicketUpdates(); try { @@ -1334,27 +1339,42 @@ public final class ChunkHolderManager { } finally { this.unblockTicketUpdates(Boolean.FALSE); } - } - if (changedFullStatus != null) { this.addChangedStatuses(changedFullStatus); - } - if (canProcessScheduling && scheduledTasks != null) { for (int i = 0, len = scheduledTasks.size(); i < len; ++i) { scheduledTasks.get(i).schedule(); } } - if (canProcessFullUpdates) { + if (isTickThread) { ret |= this.processPendingFullUpdate(); } return ret; } + private static final ThreadLocal> CURRENT_TICKET_UPDATE_SCHEDULING = new ThreadLocal<>(); + + static List getCurrentTicketUpdateScheduling() { + return CURRENT_TICKET_UPDATE_SCHEDULING.get(); + } + + // only call on tick thread + private void processOffThreadFullUpdates() { + final ArrayDeque pendingFullLoadUpdate = this.pendingFullLoadUpdate; + final MultiThreadedQueue offThreadPendingFullLoadUpdate = this.offThreadPendingFullLoadUpdate; + + NewChunkHolder toUpdate; + while ((toUpdate = offThreadPendingFullLoadUpdate.poll()) != null) { + pendingFullLoadUpdate.add(toUpdate); + } + } + // only call on tick thread private boolean processPendingFullUpdate() { + this.processOffThreadFullUpdates(); + final ArrayDeque pendingFullLoadUpdate = this.pendingFullLoadUpdate; boolean ret = false;