Do not allow ticket level decreases to be processed asynchronously
Note: This cannot happen on the Fabric/NeoForge versions since async ticket level processing is not allowed, but can happen on Paper. This change is made here so that Paper can remain in sync. Ticket level decreases may be handled asynchronously when the off-thread invokes processTicketUpdates() when the main thread is running ChunkHolderManager#tick(). This is because the ticket update is queued during tick(), but not executed (invoking processTicketUpdates) until after releasing the ticket lock. This creates a small window for an off-thread to invoke processTicketUpdates() and steal the update. When the update is stolen, the full chunk status update (if any) will be eventually queued to execute via the chunk task queue. If the chunk queue is processed during the server tick at any point other than the ChunkHolderManager tick, then any ticket level decrease will violate an important invariant in the Moonrise chunk system: ticket level decreases only occur during ChunkHolderManager tick. This invariant exists to make interfacing with the chunk system easier, especially working with off-thread contexts. This change is specifically made to work towards fixing https://github.com/PaperMC/Folia/issues/363
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package ca.spottedleaf.moonrise.patches.chunk_system.scheduling;
|
||||
|
||||
import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
|
||||
import ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock;
|
||||
import ca.spottedleaf.concurrentutil.map.ConcurrentLong2ReferenceChainedHashTable;
|
||||
import ca.spottedleaf.concurrentutil.util.Priority;
|
||||
@@ -81,6 +82,7 @@ public final class ChunkHolderManager {
|
||||
private long currentTick;
|
||||
|
||||
private final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = new ArrayDeque<>();
|
||||
private final MultiThreadedQueue<NewChunkHolder> offThreadPendingFullLoadUpdate = new MultiThreadedQueue<>();
|
||||
private final ObjectRBTreeSet<NewChunkHolder> autoSaveQueue = new ObjectRBTreeSet<>((final NewChunkHolder c1, final NewChunkHolder c2) -> {
|
||||
if (c1 == c2) {
|
||||
return 0;
|
||||
@@ -109,20 +111,20 @@ public final class ChunkHolderManager {
|
||||
this.unloadQueue = new ChunkUnloadQueue(((ChunkSystemServerLevel)world).moonrise$getRegionChunkShift());
|
||||
}
|
||||
|
||||
public boolean processTicketUpdates(final int posX, final int posZ) {
|
||||
public boolean processTicketUpdates(final int chunkX, final int chunkZ) {
|
||||
final int ticketShift = ThreadedTicketLevelPropagator.SECTION_SHIFT;
|
||||
final int ticketMask = (1 << ticketShift) - 1;
|
||||
final List<ChunkProgressionTask> scheduledTasks = new ArrayList<>();
|
||||
final List<NewChunkHolder> changedFullStatus = new ArrayList<>();
|
||||
final boolean ret;
|
||||
final ReentrantAreaLock.Node ticketLock = this.ticketLockArea.lock(
|
||||
((posX >> ticketShift) - 1) << ticketShift,
|
||||
((posZ >> ticketShift) - 1) << ticketShift,
|
||||
(((posX >> ticketShift) + 1) << ticketShift) | ticketMask,
|
||||
(((posZ >> ticketShift) + 1) << ticketShift) | ticketMask
|
||||
((chunkX >> ticketShift) - 1) << ticketShift,
|
||||
((chunkZ >> ticketShift) - 1) << ticketShift,
|
||||
(((chunkX >> ticketShift) + 1) << ticketShift) | ticketMask,
|
||||
(((chunkZ >> ticketShift) + 1) << ticketShift) | ticketMask
|
||||
);
|
||||
try {
|
||||
ret = this.processTicketUpdatesNoLock(posX >> ticketShift, posZ >> ticketShift, scheduledTasks, changedFullStatus);
|
||||
ret = this.processTicketUpdatesNoLock(chunkX >> ticketShift, chunkZ >> ticketShift, scheduledTasks, changedFullStatus);
|
||||
} finally {
|
||||
this.ticketLockArea.unlock(ticketLock);
|
||||
}
|
||||
@@ -720,6 +722,9 @@ public final class ChunkHolderManager {
|
||||
return removeDelay <= 0L;
|
||||
};
|
||||
|
||||
final List<ChunkProgressionTask> scheduledTasks = new ArrayList<>();
|
||||
final List<NewChunkHolder> changedFullStatus = new ArrayList<>();
|
||||
|
||||
for (final PrimitiveIterator.OfLong iterator = this.sectionToChunkToExpireCount.keyIterator(); iterator.hasNext();) {
|
||||
final long sectionKey = iterator.nextLong();
|
||||
|
||||
@@ -728,9 +733,16 @@ public final class ChunkHolderManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
final int lowerChunkX = CoordinateUtils.getChunkX(sectionKey) << sectionShift;
|
||||
final int lowerChunkZ = CoordinateUtils.getChunkZ(sectionKey) << sectionShift;
|
||||
|
||||
final int ticketShift = ThreadedTicketLevelPropagator.SECTION_SHIFT;
|
||||
final int ticketMask = (1 << ticketShift) - 1;
|
||||
final ReentrantAreaLock.Node ticketLock = this.ticketLockArea.lock(
|
||||
CoordinateUtils.getChunkX(sectionKey) << sectionShift,
|
||||
CoordinateUtils.getChunkZ(sectionKey) << sectionShift
|
||||
((lowerChunkX >> ticketShift) - 1) << ticketShift,
|
||||
((lowerChunkZ >> ticketShift) - 1) << ticketShift,
|
||||
(((lowerChunkX >> ticketShift) + 1) << ticketShift) | ticketMask,
|
||||
(((lowerChunkZ >> ticketShift) + 1) << ticketShift) | ticketMask
|
||||
);
|
||||
|
||||
try {
|
||||
@@ -777,9 +789,23 @@ public final class ChunkHolderManager {
|
||||
if (chunkToExpireCount.isEmpty()) {
|
||||
this.sectionToChunkToExpireCount.remove(sectionKey);
|
||||
}
|
||||
|
||||
// In order to prevent a race condition where an off-thread invokes processTicketUpdates(), we need to process ticket updates here
|
||||
// so that we catch any additions to the changed full status list. If an off-thread were to process tickets here, it would not be guaranteed
|
||||
// that it would be added to the full changed status set by the end of the call - possibly allowing ticket level decreases to be processed
|
||||
// outside of this call, which is not an intended or expected of this chunk system.
|
||||
this.processTicketUpdatesNoLock(lowerChunkX >> ThreadedTicketLevelPropagator.SECTION_SHIFT, lowerChunkZ >> ThreadedTicketLevelPropagator.SECTION_SHIFT, scheduledTasks, changedFullStatus);
|
||||
} finally {
|
||||
this.ticketLockArea.unlock(ticketLock);
|
||||
}
|
||||
|
||||
this.addChangedStatuses(changedFullStatus);
|
||||
changedFullStatus.clear(); // clear for next loop iteration
|
||||
|
||||
for (int i = 0, len = scheduledTasks.size(); i < len; ++i) {
|
||||
scheduledTasks.get(i).schedule();
|
||||
}
|
||||
scheduledTasks.clear(); // clear for next loop iteration
|
||||
}
|
||||
|
||||
this.processTicketUpdates();
|
||||
@@ -1006,14 +1032,9 @@ public final class ChunkHolderManager {
|
||||
return;
|
||||
}
|
||||
if (!TickThread.isTickThread()) {
|
||||
this.taskScheduler.scheduleChunkTask(() -> {
|
||||
final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = ChunkHolderManager.this.pendingFullLoadUpdate;
|
||||
for (int i = 0, len = changedFullStatus.size(); i < len; ++i) {
|
||||
pendingFullLoadUpdate.add(changedFullStatus.get(i));
|
||||
}
|
||||
|
||||
ChunkHolderManager.this.processPendingFullUpdate();
|
||||
}, Priority.HIGHEST);
|
||||
// These will be handled on the next ServerChunkCache$MainThreadExecutor#pollTask, as it runs the distance manager update
|
||||
// which will invoke processTicketUpdates
|
||||
this.offThreadPendingFullLoadUpdate.addAll(changedFullStatus);
|
||||
} else {
|
||||
final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = this.pendingFullLoadUpdate;
|
||||
for (int i = 0, len = changedFullStatus.size(); i < len; ++i) {
|
||||
@@ -1294,36 +1315,20 @@ public final class ChunkHolderManager {
|
||||
}
|
||||
|
||||
public boolean processTicketUpdates() {
|
||||
return this.processTicketUpdates(true, null);
|
||||
}
|
||||
|
||||
private static final ThreadLocal<List<ChunkProgressionTask>> CURRENT_TICKET_UPDATE_SCHEDULING = new ThreadLocal<>();
|
||||
|
||||
static List<ChunkProgressionTask> getCurrentTicketUpdateScheduling() {
|
||||
return CURRENT_TICKET_UPDATE_SCHEDULING.get();
|
||||
}
|
||||
|
||||
private boolean processTicketUpdates(final boolean processFullUpdates, List<ChunkProgressionTask> scheduledTasks) {
|
||||
if (BLOCK_TICKET_UPDATES.get() == Boolean.TRUE) {
|
||||
throw new IllegalStateException("Cannot update ticket level while unloading chunks or updating entity manager");
|
||||
}
|
||||
if (!PlatformHooks.get().allowAsyncTicketUpdates() && !TickThread.isTickThread()) {
|
||||
final boolean isTickThread = TickThread.isTickThread();
|
||||
|
||||
if (!PlatformHooks.get().allowAsyncTicketUpdates() && isTickThread) {
|
||||
TickThread.ensureTickThread("Cannot asynchronously process ticket updates");
|
||||
}
|
||||
|
||||
List<NewChunkHolder> changedFullStatus = null;
|
||||
|
||||
final boolean isTickThread = TickThread.isTickThread();
|
||||
|
||||
boolean ret = false;
|
||||
final boolean canProcessFullUpdates = processFullUpdates & isTickThread;
|
||||
final boolean canProcessScheduling = scheduledTasks == null;
|
||||
|
||||
if (this.ticketLevelPropagator.hasPendingUpdates()) {
|
||||
if (scheduledTasks == null) {
|
||||
scheduledTasks = new ArrayList<>();
|
||||
}
|
||||
changedFullStatus = new ArrayList<>();
|
||||
final List<ChunkProgressionTask> scheduledTasks = new ArrayList<>();
|
||||
final List<NewChunkHolder> changedFullStatus = new ArrayList<>();
|
||||
|
||||
this.blockTicketUpdates();
|
||||
try {
|
||||
@@ -1334,27 +1339,42 @@ public final class ChunkHolderManager {
|
||||
} finally {
|
||||
this.unblockTicketUpdates(Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
|
||||
if (changedFullStatus != null) {
|
||||
this.addChangedStatuses(changedFullStatus);
|
||||
}
|
||||
|
||||
if (canProcessScheduling && scheduledTasks != null) {
|
||||
for (int i = 0, len = scheduledTasks.size(); i < len; ++i) {
|
||||
scheduledTasks.get(i).schedule();
|
||||
}
|
||||
}
|
||||
|
||||
if (canProcessFullUpdates) {
|
||||
if (isTickThread) {
|
||||
ret |= this.processPendingFullUpdate();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private static final ThreadLocal<List<ChunkProgressionTask>> CURRENT_TICKET_UPDATE_SCHEDULING = new ThreadLocal<>();
|
||||
|
||||
static List<ChunkProgressionTask> getCurrentTicketUpdateScheduling() {
|
||||
return CURRENT_TICKET_UPDATE_SCHEDULING.get();
|
||||
}
|
||||
|
||||
// only call on tick thread
|
||||
private void processOffThreadFullUpdates() {
|
||||
final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = this.pendingFullLoadUpdate;
|
||||
final MultiThreadedQueue<NewChunkHolder> offThreadPendingFullLoadUpdate = this.offThreadPendingFullLoadUpdate;
|
||||
|
||||
NewChunkHolder toUpdate;
|
||||
while ((toUpdate = offThreadPendingFullLoadUpdate.poll()) != null) {
|
||||
pendingFullLoadUpdate.add(toUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
// only call on tick thread
|
||||
private boolean processPendingFullUpdate() {
|
||||
this.processOffThreadFullUpdates();
|
||||
|
||||
final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = this.pendingFullLoadUpdate;
|
||||
|
||||
boolean ret = false;
|
||||
|
||||
Reference in New Issue
Block a user