Revert RegionizedTaskQueue referent counter changes
This commit is contained in:
@@ -0,0 +1,381 @@
|
||||
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
||||
From: MrHua269 <mrhua269@gmail.com>
|
||||
Date: Thu, 20 Mar 2025 23:12:30 +0800
|
||||
Subject: [PATCH] Revert RegionizedTaskQueue referent counter changes
|
||||
|
||||
|
||||
diff --git a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
|
||||
index 745ab870310733b569681f5280895bb9798620a4..9b8d06c77aa71acbe25151d82777e5dfa4e4a5f7 100644
|
||||
--- a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
|
||||
+++ b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
|
||||
@@ -24,7 +24,7 @@ public final class RegionizedTaskQueue {
|
||||
|
||||
public PrioritisedExecutor.PrioritisedTask createChunkTask(final ServerLevel world, final int chunkX, final int chunkZ,
|
||||
final Runnable run) {
|
||||
- return this.createChunkTask(world, chunkX, chunkZ, run, Priority.NORMAL);
|
||||
+ return new PrioritisedQueue.ChunkBasedPriorityTask(world.taskQueueRegionData, chunkX, chunkZ, true, run, Priority.NORMAL);
|
||||
}
|
||||
|
||||
public PrioritisedExecutor.PrioritisedTask createChunkTask(final ServerLevel world, final int chunkX, final int chunkZ,
|
||||
@@ -34,7 +34,7 @@ public final class RegionizedTaskQueue {
|
||||
|
||||
public PrioritisedExecutor.PrioritisedTask createTickTaskQueue(final ServerLevel world, final int chunkX, final int chunkZ,
|
||||
final Runnable run) {
|
||||
- return this.createTickTaskQueue(world, chunkX, chunkZ, run, Priority.NORMAL);
|
||||
+ return new PrioritisedQueue.ChunkBasedPriorityTask(world.taskQueueRegionData, chunkX, chunkZ, false, run, Priority.NORMAL);
|
||||
}
|
||||
|
||||
public PrioritisedExecutor.PrioritisedTask createTickTaskQueue(final ServerLevel world, final int chunkX, final int chunkZ,
|
||||
@@ -73,7 +73,7 @@ public final class RegionizedTaskQueue {
|
||||
public static final class WorldRegionTaskData {
|
||||
private final ServerLevel world;
|
||||
private final MultiThreadedQueue<Runnable> globalChunkTask = new MultiThreadedQueue<>();
|
||||
- private final ConcurrentLong2ReferenceChainedHashTable<ReferenceCountData> referenceCounters = new ConcurrentLong2ReferenceChainedHashTable<>();
|
||||
+ private final ConcurrentLong2ReferenceChainedHashTable<AtomicLong> referenceCounters = new ConcurrentLong2ReferenceChainedHashTable<>();
|
||||
|
||||
public WorldRegionTaskData(final ServerLevel world) {
|
||||
this.world = world;
|
||||
@@ -99,7 +99,7 @@ public final class RegionizedTaskQueue {
|
||||
private PrioritisedQueue getQueue(final boolean synchronise, final int chunkX, final int chunkZ, final boolean isChunkTask) {
|
||||
final ThreadedRegionizer<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> regioniser = this.world.regioniser;
|
||||
final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region
|
||||
- = synchronise ? regioniser.getRegionAtSynchronised(chunkX, chunkZ) : regioniser.getRegionAtUnsynchronised(chunkX, chunkZ);
|
||||
+ = synchronise ? regioniser.getRegionAtSynchronised(chunkX, chunkZ) : regioniser.getRegionAtUnsynchronised(chunkX, chunkZ);
|
||||
if (region == null) {
|
||||
return null;
|
||||
}
|
||||
@@ -109,13 +109,13 @@ public final class RegionizedTaskQueue {
|
||||
|
||||
private void removeTicket(final long coord) {
|
||||
this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.removeTicketAtLevel(
|
||||
- TASK_QUEUE_TICKET, coord, ChunkHolderManager.MAX_TICKET_LEVEL, Unit.INSTANCE
|
||||
+ TASK_QUEUE_TICKET, coord, ChunkHolderManager.MAX_TICKET_LEVEL, Unit.INSTANCE
|
||||
);
|
||||
}
|
||||
|
||||
private void addTicket(final long coord) {
|
||||
this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.addTicketAtLevel(
|
||||
- TASK_QUEUE_TICKET, coord, ChunkHolderManager.MAX_TICKET_LEVEL, Unit.INSTANCE
|
||||
+ TASK_QUEUE_TICKET, coord, ChunkHolderManager.MAX_TICKET_LEVEL, Unit.INSTANCE
|
||||
);
|
||||
}
|
||||
|
||||
@@ -123,95 +123,96 @@ public final class RegionizedTaskQueue {
|
||||
this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.processTicketUpdates(CoordinateUtils.getChunkX(coord), CoordinateUtils.getChunkZ(coord));
|
||||
}
|
||||
|
||||
- // note: only call on acquired referenceCountData
|
||||
- private void ensureTicketAdded(final long coord, final ReferenceCountData referenceCountData) {
|
||||
- if (!referenceCountData.addedTicket) {
|
||||
- // fine if multiple threads do this, no removeTicket may be called for this coord due to reference count inc
|
||||
- this.addTicket(coord);
|
||||
- this.processTicketUpdates(coord);
|
||||
- referenceCountData.addedTicket = true;
|
||||
+ private void decrementReference(final AtomicLong reference, final long coord) {
|
||||
+ final long val = reference.decrementAndGet();
|
||||
+ if (val == 0L) {
|
||||
+ final int chunkX = CoordinateUtils.getChunkX(coord);
|
||||
+ final int chunkZ = CoordinateUtils.getChunkZ(coord);
|
||||
+ final ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock.Node ticketLock = this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.lock(chunkX, chunkZ);
|
||||
+ try {
|
||||
+ if (this.referenceCounters.remove(coord, reference) == reference) {
|
||||
+ WorldRegionTaskData.this.removeTicket(coord);
|
||||
+ } // else: race condition, something replaced our reference - not our issue anymore
|
||||
+ } finally {
|
||||
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.unlock(ticketLock);
|
||||
+ }
|
||||
+ } else if (val < 0L) {
|
||||
+ throw new IllegalStateException("Reference count < 0: " + val);
|
||||
}
|
||||
}
|
||||
|
||||
- private void decrementReference(final ReferenceCountData referenceCountData, final long coord) {
|
||||
- if (!referenceCountData.decreaseReferenceCount()) {
|
||||
- return;
|
||||
- } // else: need to remove ticket
|
||||
-
|
||||
- // note: it is possible that another thread increments and then removes the reference before we can, so
|
||||
- // use ifPresent
|
||||
- this.referenceCounters.computeIfPresent(coord, (final long keyInMap, final ReferenceCountData valueInMap) -> {
|
||||
- if (valueInMap.referenceCount.get() != 0L) {
|
||||
- return valueInMap;
|
||||
- }
|
||||
-
|
||||
- // note: valueInMap may not be referenceCountData
|
||||
-
|
||||
- // possible to invoke this outside of the compute call, but not required and requires additional logic
|
||||
- WorldRegionTaskData.this.removeTicket(keyInMap);
|
||||
+ private AtomicLong incrementReference(final long coord) {
|
||||
+ final AtomicLong ret = this.referenceCounters.get(coord);
|
||||
+ if (ret != null) {
|
||||
+ // try to fast acquire counter
|
||||
+ int failures = 0;
|
||||
+ for (long curr = ret.get();;) {
|
||||
+ if (curr == 0L) {
|
||||
+ // failed to fast acquire as reference expired
|
||||
+ break;
|
||||
+ }
|
||||
|
||||
- return null;
|
||||
- });
|
||||
- }
|
||||
+ for (int i = 0; i < failures; ++i) {
|
||||
+ ConcurrentUtil.backoff();
|
||||
+ }
|
||||
|
||||
- private ReferenceCountData incrementReference(final long coord) {
|
||||
- ReferenceCountData referenceCountData = this.referenceCounters.get(coord);
|
||||
+ if (curr == (curr = ret.compareAndExchange(curr, curr + 1L))) {
|
||||
+ return ret;
|
||||
+ }
|
||||
|
||||
- if (referenceCountData != null && referenceCountData.addCount()) {
|
||||
- this.ensureTicketAdded(coord, referenceCountData);
|
||||
- return referenceCountData;
|
||||
+ ++failures;
|
||||
+ }
|
||||
}
|
||||
|
||||
- referenceCountData = this.referenceCounters.compute(coord, (final long keyInMap, final ReferenceCountData valueInMap) -> {
|
||||
+ // slow acquire
|
||||
+ final int chunkX = CoordinateUtils.getChunkX(coord);
|
||||
+ final int chunkZ = CoordinateUtils.getChunkZ(coord);
|
||||
+ final ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock.Node ticketLock = this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.lock(chunkX, chunkZ);
|
||||
+ final AtomicLong ret2;
|
||||
+ final boolean processTicketUpdates;
|
||||
+ try {
|
||||
+ final AtomicLong replace = new AtomicLong(1L);
|
||||
+ final AtomicLong valueInMap = this.referenceCounters.putIfAbsent(coord, replace);
|
||||
if (valueInMap == null) {
|
||||
- // sets reference count to 1
|
||||
- return new ReferenceCountData();
|
||||
- }
|
||||
- // OK if we add from 0, the remove call will use compute() and catch this race condition
|
||||
- valueInMap.referenceCount.getAndIncrement();
|
||||
-
|
||||
- return valueInMap;
|
||||
- });
|
||||
-
|
||||
- this.ensureTicketAdded(coord, referenceCountData);
|
||||
-
|
||||
- return referenceCountData;
|
||||
- }
|
||||
- }
|
||||
-
|
||||
- private static final class ReferenceCountData {
|
||||
-
|
||||
- public final AtomicLong referenceCount = new AtomicLong(1L);
|
||||
- public volatile boolean addedTicket;
|
||||
+ // replaced, we should usually be here
|
||||
+ this.addTicket(coord);
|
||||
+ ret2 = replace;
|
||||
+ processTicketUpdates = true;
|
||||
+ } else {
|
||||
+ processTicketUpdates = false;
|
||||
+ int failures = 0;
|
||||
+ for (long curr = valueInMap.get();;) {
|
||||
+ if (curr == 0L) {
|
||||
+ // don't need to add ticket here, since ticket is only removed during the lock
|
||||
+ // we just need to replace the value in the map so that the thread removing fails and doesn't
|
||||
+ // remove the ticket (see decrementReference)
|
||||
+ this.referenceCounters.put(coord, replace);
|
||||
+ ret2 = replace;
|
||||
+ break;
|
||||
+ }
|
||||
|
||||
- // returns false if reference count is 0, otherwise increments ref count
|
||||
- public boolean addCount() {
|
||||
- int failures = 0;
|
||||
- for (long curr = this.referenceCount.get();;) {
|
||||
- for (int i = 0; i < failures; ++i) {
|
||||
- Thread.onSpinWait();
|
||||
- }
|
||||
+ for (int i = 0; i < failures; ++i) {
|
||||
+ ConcurrentUtil.backoff();
|
||||
+ }
|
||||
|
||||
- if (curr == 0L) {
|
||||
- return false;
|
||||
- }
|
||||
+ if (curr == (curr = valueInMap.compareAndExchange(curr, curr + 1L))) {
|
||||
+ // acquired
|
||||
+ ret2 = valueInMap;
|
||||
+ break;
|
||||
+ }
|
||||
|
||||
- if (curr == (curr = this.referenceCount.compareAndExchange(curr, curr + 1L))) {
|
||||
- return true;
|
||||
+ ++failures;
|
||||
+ }
|
||||
}
|
||||
-
|
||||
- ++failures;
|
||||
+ } finally {
|
||||
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.unlock(ticketLock);
|
||||
}
|
||||
- }
|
||||
|
||||
- // returns true if new reference count is 0
|
||||
- public boolean decreaseReferenceCount() {
|
||||
- final long res = this.referenceCount.decrementAndGet();
|
||||
- if (res >= 0L) {
|
||||
- return res == 0L;
|
||||
- } else {
|
||||
- throw new IllegalStateException("Negative reference count");
|
||||
+ if (processTicketUpdates) {
|
||||
+ this.processTicketUpdates(coord);
|
||||
}
|
||||
+
|
||||
+ return ret2;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,10 +241,10 @@ public final class RegionizedTaskQueue {
|
||||
void split(final ThreadedRegionizer<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> regioniser,
|
||||
final Long2ReferenceOpenHashMap<ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData>> into) {
|
||||
this.tickTaskQueue.split(
|
||||
- false, regioniser, into
|
||||
+ false, regioniser, into
|
||||
);
|
||||
this.chunkQueue.split(
|
||||
- true, regioniser, into
|
||||
+ true, regioniser, into
|
||||
);
|
||||
}
|
||||
|
||||
@@ -340,7 +341,7 @@ public final class RegionizedTaskQueue {
|
||||
final ThreadedRegionizer<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> regioniser,
|
||||
final Long2ReferenceOpenHashMap<ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData>> into) {
|
||||
final Reference2ReferenceOpenHashMap<ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData>, ArrayDeque<ChunkBasedPriorityTask>[]>
|
||||
- split = new Reference2ReferenceOpenHashMap<>();
|
||||
+ split = new Reference2ReferenceOpenHashMap<>();
|
||||
final int shift = regioniser.sectionChunkShift;
|
||||
synchronized (this) {
|
||||
this.isDestroyed = true;
|
||||
@@ -356,7 +357,7 @@ public final class RegionizedTaskQueue {
|
||||
final int sectionZ = task.chunkZ >> shift;
|
||||
final long sectionKey = CoordinateUtils.getChunkKey(sectionX, sectionZ);
|
||||
final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData>
|
||||
- region = into.get(sectionKey);
|
||||
+ region = into.get(sectionKey);
|
||||
if (region == null) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
@@ -378,7 +379,7 @@ public final class RegionizedTaskQueue {
|
||||
iterator = split.reference2ReferenceEntrySet().fastIterator();
|
||||
iterator.hasNext();) {
|
||||
final Reference2ReferenceMap.Entry<ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData>, ArrayDeque<ChunkBasedPriorityTask>[]>
|
||||
- entry = iterator.next();
|
||||
+ entry = iterator.next();
|
||||
final RegionTaskQueueData taskQueueData = entry.getKey().getData().getTaskQueueData();
|
||||
mergeInto(isChunkData ? taskQueueData.chunkQueue : taskQueueData.tickTaskQueue, entry.getValue());
|
||||
}
|
||||
@@ -408,7 +409,7 @@ public final class RegionizedTaskQueue {
|
||||
final ArrayDeque<ChunkBasedPriorityTask>[] queues = this.queues;
|
||||
final int max = Priority.IDLE.priority;
|
||||
ChunkBasedPriorityTask task = null;
|
||||
- ReferenceCountData referenceCounter = null;
|
||||
+ AtomicLong referenceCounter = null;
|
||||
synchronized (this) {
|
||||
if (this.isDestroyed) {
|
||||
throw new IllegalStateException("Attempting to poll from dead queue");
|
||||
@@ -440,10 +441,9 @@ public final class RegionizedTaskQueue {
|
||||
|
||||
private static final class ChunkBasedPriorityTask implements PrioritisedExecutor.PrioritisedTask {
|
||||
|
||||
- private static final ReferenceCountData REFERENCE_COUNTER_NOT_SET = new ReferenceCountData();
|
||||
- static {
|
||||
- REFERENCE_COUNTER_NOT_SET.referenceCount.set((long)Integer.MIN_VALUE);
|
||||
- }
|
||||
+ private static final AtomicLong REFERENCE_COUNTER_NOT_SET = new AtomicLong(-1L);
|
||||
+
|
||||
+
|
||||
|
||||
private final WorldRegionTaskData world;
|
||||
private final int chunkX;
|
||||
@@ -451,8 +451,8 @@ public final class RegionizedTaskQueue {
|
||||
private final long sectionLowerLeftCoord; // chunk coordinate
|
||||
private final boolean isChunkTask;
|
||||
|
||||
- private volatile ReferenceCountData referenceCounter;
|
||||
- private static final VarHandle REFERENCE_COUNTER_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "referenceCounter", ReferenceCountData.class);
|
||||
+ private volatile AtomicLong referenceCounter;
|
||||
+ private static final VarHandle REFERENCE_COUNTER_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "referenceCounter", AtomicLong.class);
|
||||
private Runnable run;
|
||||
private volatile Priority priority;
|
||||
private static final VarHandle PRIORITY_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "priority", Priority.class);
|
||||
@@ -489,16 +489,16 @@ public final class RegionizedTaskQueue {
|
||||
return (Priority)PRIORITY_HANDLE.compareAndExchange(this, expect, update);
|
||||
}
|
||||
|
||||
- private void setReferenceCounterPlain(final ReferenceCountData value) {
|
||||
+ private void setReferenceCounterPlain(final AtomicLong value) {
|
||||
REFERENCE_COUNTER_HANDLE.set(this, value);
|
||||
}
|
||||
|
||||
- private ReferenceCountData getReferenceCounterVolatile() {
|
||||
- return (ReferenceCountData)REFERENCE_COUNTER_HANDLE.get(this);
|
||||
+ private AtomicLong getReferenceCounterVolatile() {
|
||||
+ return (AtomicLong)REFERENCE_COUNTER_HANDLE.get(this);
|
||||
}
|
||||
|
||||
- private ReferenceCountData compareAndExchangeReferenceCounter(final ReferenceCountData expect, final ReferenceCountData update) {
|
||||
- return (ReferenceCountData)REFERENCE_COUNTER_HANDLE.compareAndExchange(this, expect, update);
|
||||
+ private AtomicLong compareAndExchangeReferenceCounter(final AtomicLong expect, final AtomicLong update) {
|
||||
+ return (AtomicLong)REFERENCE_COUNTER_HANDLE.compareAndExchange(this, expect, update);
|
||||
}
|
||||
|
||||
private void executeInternal() {
|
||||
@@ -515,7 +515,7 @@ public final class RegionizedTaskQueue {
|
||||
|
||||
private boolean tryComplete(final boolean cancel) {
|
||||
int failures = 0;
|
||||
- for (ReferenceCountData curr = this.getReferenceCounterVolatile();;) {
|
||||
+ for (AtomicLong curr = this.getReferenceCounterVolatile();;) {
|
||||
if (curr == null) {
|
||||
return false;
|
||||
}
|
||||
@@ -564,7 +564,7 @@ public final class RegionizedTaskQueue {
|
||||
return false;
|
||||
}
|
||||
|
||||
- final ReferenceCountData referenceCounter = this.world.incrementReference(this.sectionLowerLeftCoord);
|
||||
+ final AtomicLong referenceCounter = this.world.incrementReference(this.sectionLowerLeftCoord);
|
||||
if (this.compareAndExchangeReferenceCounter(REFERENCE_COUNTER_NOT_SET, referenceCounter) != REFERENCE_COUNTER_NOT_SET) {
|
||||
// we don't expect race conditions here, so it is OK if we have to needlessly reference count
|
||||
this.world.decrementReference(referenceCounter, this.sectionLowerLeftCoord);
|
||||
@@ -614,7 +614,7 @@ public final class RegionizedTaskQueue {
|
||||
}
|
||||
}
|
||||
|
||||
- private ReferenceCountData trySetCompleting(final int minPriority) {
|
||||
+ private AtomicLong trySetCompleting(final int minPriority) {
|
||||
// first, try to set priority to EXECUTING
|
||||
for (Priority curr = this.getPriorityVolatile();;) {
|
||||
if (curr.isLowerPriority(minPriority)) {
|
||||
@@ -626,7 +626,7 @@ public final class RegionizedTaskQueue {
|
||||
} // else: continue
|
||||
}
|
||||
|
||||
- for (ReferenceCountData curr = this.getReferenceCounterVolatile();;) {
|
||||
+ for (AtomicLong curr = this.getReferenceCounterVolatile();;) {
|
||||
if (curr == null) {
|
||||
// something acquired before us
|
||||
return null;
|
||||
@@ -639,7 +639,6 @@ public final class RegionizedTaskQueue {
|
||||
if (curr != (curr = this.compareAndExchangeReferenceCounter(curr, null))) {
|
||||
continue;
|
||||
}
|
||||
-
|
||||
return curr;
|
||||
}
|
||||
}
|
||||
@@ -647,7 +646,7 @@ public final class RegionizedTaskQueue {
|
||||
private void updatePriorityInQueue() {
|
||||
boolean synchronise = false;
|
||||
for (;;) {
|
||||
- final ReferenceCountData referenceCount = this.getReferenceCounterVolatile();
|
||||
+ final AtomicLong referenceCount = this.getReferenceCounterVolatile();
|
||||
if (referenceCount == REFERENCE_COUNTER_NOT_SET || referenceCount == null) {
|
||||
// cancelled or not queued
|
||||
return;
|
||||
Reference in New Issue
Block a user