9
0
mirror of https://github.com/Dreeam-qwq/Gale.git synced 2025-12-27 02:29:11 +00:00

Only notify one thread per unlocking event

This commit is contained in:
Martijn Muijsers
2023-02-06 21:15:26 +01:00
parent 25e9f8a3c5
commit ac4ce86d65
2 changed files with 77 additions and 30 deletions

View File

@@ -2750,10 +2750,10 @@ index 0000000000000000000000000000000000000000..686e16da8372085196d8f92adb881f82
+}
diff --git a/src/main/java/org/galemc/gale/executor/lock/YieldingLock.java b/src/main/java/org/galemc/gale/executor/lock/YieldingLock.java
new file mode 100644
index 0000000000000000000000000000000000000000..4b40afe88f166491f74f99bab96092d04e77fe02
index 0000000000000000000000000000000000000000..44b8bd5fd9a3ee2e484c81104523ba956f8d982f
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/lock/YieldingLock.java
@@ -0,0 +1,202 @@
@@ -0,0 +1,213 @@
+// Gale - base thread pool
+
+package org.galemc.gale.executor.lock;
@@ -2763,6 +2763,7 @@ index 0000000000000000000000000000000000000000..4b40afe88f166491f74f99bab96092d0
+import org.galemc.gale.executor.annotation.PotentiallyYielding;
+import org.galemc.gale.executor.annotation.YieldFree;
+import org.galemc.gale.executor.thread.AbstractYieldingThread;
+import org.galemc.gale.executor.thread.BaseThread;
+import org.galemc.gale.executor.thread.pool.BaseThreadActivation;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
@@ -2808,6 +2809,13 @@ index 0000000000000000000000000000000000000000..4b40afe88f166491f74f99bab96092d0
+ */
+ private boolean canBeHeld;
+
+ /**
+ * Whether any threads can be {@linkplain BaseThread#signal signalled} for this lock being unlocked.
+ * This is set to false when a thread is signalled, until the thread has actually attempted to
+ * acquire the lock, so that we do not activate multiple threads for the same unlocking event.
+ */
+ public volatile boolean canBeSignalledFor = true;
+
+ public YieldingLock(Lock innerLock) {
+ this.innerLock = innerLock;
+ if (innerLock instanceof CheckableLock checkableLock) {
@@ -2885,12 +2893,15 @@ index 0000000000000000000000000000000000000000..4b40afe88f166491f74f99bab96092d0
+ yieldingThread.decrementHeldYieldingLockCount();
+ }
+ }
+ // Potentially signal a thread that this lock has become available.
+ // Another thread could also acquire the lock at this moment, so when we signal the thread we obtain below,
+ // it may already be too late for the polled thread to acquire this lock
+ // (but note that the same thread cannot have been added again because only the thread itself can do that -
+ // and it is still waiting).
+ if (this.hasWaitingThreads()) {
+ /*
+ Potentially signal a thread that this lock has become available.
+ Another thread could also acquire the lock at this moment, so when we signal the thread we obtain below,
+ it may already be too late for the polled thread to acquire this lock
+ (but note that the same thread cannot have been added again because only the thread itself can do that -
+ and it is still waiting).
+ Also note that this lock may still be locked (since it may be reentrant) so we check the locked state.
+ */
+ if (!this.isLocked() && this.hasWaitingThreads()) {
+ BaseThreadActivation.yieldingLockWithWaitingThreadsWasUnlocked();
+ }
+ }
@@ -4427,10 +4438,10 @@ index 0000000000000000000000000000000000000000..eab769d7319f26db1f4db9599a3c263c
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
new file mode 100644
index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88f8d86d5b
index 0000000000000000000000000000000000000000..76ca22e30b41e4a39ed215cfb03c9a793eefcc09
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
@@ -0,0 +1,725 @@
@@ -0,0 +1,760 @@
+// Gale - base thread pool
+
+package org.galemc.gale.executor.thread;
@@ -4594,6 +4605,13 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ public volatile @Nullable YieldingLock lockWaitingFor = null;
+
+ /**
+ * The value of {@link #lockWaitingFor} during the last wait (a call to {@link Condition#await})
+ * or pre-wait check (while {@link #isNotActuallyWaitingYet} is true).
+ */
+ @ThisThreadOnly
+ private @Nullable YieldingLock lastLockWaitedFor = null;
+
+ /**
+ * A special flag, used after changing {@link #isWaiting}, when the lock must be temporarily released to
+ * call {@link BaseThreadActivation#callForUpdate()} (to avoid deadlocks in {@link #signal} calls),
+ * and we wish the pool to regard this thread as waiting
@@ -4715,23 +4733,31 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ to be released.
+ */
+ while (true) {
+ if (timeoutTime != null && System.nanoTime() - timeoutTime >= 0) {
+ break;
+ }
+ if (stopCondition != null) {
+ if (this == MinecraftServer.serverThread) {
+ MinecraftServer.currentManagedBlockStopConditionHasBecomeTrue = false;
+ try {
+ if (timeoutTime != null && System.nanoTime() - timeoutTime >= 0) {
+ break;
+ }
+ if (stopCondition.getAsBoolean()) {
+ if (stopCondition != null) {
+ if (this == MinecraftServer.serverThread) {
+ MinecraftServer.currentManagedBlockStopConditionHasBecomeTrue = true;
+ MinecraftServer.currentManagedBlockStopConditionHasBecomeTrue = false;
+ }
+ if (stopCondition.getAsBoolean()) {
+ if (this == MinecraftServer.serverThread) {
+ MinecraftServer.currentManagedBlockStopConditionHasBecomeTrue = true;
+ }
+ break;
+ }
+ } else {
+ //noinspection ConstantConditions
+ if (yieldingLock.tryLock()) {
+ break;
+ }
+ break;
+ }
+ } else {
+ //noinspection ConstantConditions
+ if (yieldingLock.tryLock()) {
+ break;
+ } finally {
+ // Make sure other threads can be signalled for the last waited-for lock again
+ if (this.lastLockWaitedFor != null) {
+ this.lastLockWaitedFor.canBeSignalledFor = true;
+ this.lastLockWaitedFor = null;
+ }
+ }
+
@@ -4997,6 +5023,7 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ // If it was set that this thread should skip the wait in the meantime, skip it
+ if (this.skipNextWait) {
+ this.isWaiting = false;
+ this.lastLockWaitedFor = this.lockWaitingFor;
+ this.lockWaitingFor = null;
+ mustCallPoolUpdateAtEnd = true;
+ break waitWithLock;
@@ -5065,6 +5092,7 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+
+ // Unmark this thread as waiting
+ this.isWaiting = false;
+ this.lastLockWaitedFor = this.lockWaitingFor;
+ this.lockWaitingFor = null;
+ mustCallPoolUpdateAtEnd = true;
+
@@ -5091,6 +5119,21 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ }
+
+ /**
+ * An auxiliary method for exclusive use in {@link #signal}, that marks the {@link YieldingLock}
+ * that this thread is waiting for as having been signalled for, so that no other threads
+ * are also signalled for it.
+ * <br>
+ * This must be called when {@link #signal} returns true, and must be called before any other
+ * actions relating to the signalling of this thread are performed.
+ */
+ private void markLockWaitingForAsSignalledFor() {
+ @Nullable YieldingLock lockWaitingFor = this.lockWaitingFor;
+ if (lockWaitingFor != null) {
+ lockWaitingFor.canBeSignalledFor = false;
+ }
+ }
+
+ /**
+ * Signals this thread to wake up, or if it was not sleeping but attempting to poll a task:
+ * to not go to sleep the next time no task could be polled, and instead try polling a task again.
+ *
@@ -5110,6 +5153,7 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ if (this.isWaiting) {
+ if (this.isNotActuallyWaitingYet) {
+ if (!this.skipNextWait) {
+ this.markLockWaitingForAsSignalledFor();
+ this.lastSignalReason = reason;
+ this.skipNextWait = true;
+ return true;
@@ -5117,6 +5161,7 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ return false;
+ }
+ if (!this.mayBeStillWaitingButHasBeenSignalled) {
+ this.markLockWaitingForAsSignalledFor();
+ this.lastSignalReason = reason;
+ this.mayBeStillWaitingButHasBeenSignalled = true;
+ this.waitCondition.signal();
@@ -5124,6 +5169,7 @@ index 0000000000000000000000000000000000000000..f0bf8cfc28107d3f2421d033e45f1a88
+ }
+ } else if (this.isPollingTaskOrCheckingStopCondition) {
+ if (!this.skipNextWait) {
+ this.markLockWaitingForAsSignalledFor();
+ this.lastSignalReason = reason;
+ this.skipNextWait = true;
+ return true;
@@ -5629,10 +5675,10 @@ index 0000000000000000000000000000000000000000..77fe10e51b00115da520cfc211bf84ba
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/pool/BaseThreadActivation.java b/src/main/java/org/galemc/gale/executor/thread/pool/BaseThreadActivation.java
new file mode 100644
index 0000000000000000000000000000000000000000..5554f23c4cca4e9f0be036e11c9e2c69ad6293ce
index 0000000000000000000000000000000000000000..20818236c21bc6f42642ddd788cbca38d0ca1051
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/thread/pool/BaseThreadActivation.java
@@ -0,0 +1,593 @@
@@ -0,0 +1,594 @@
+// Gale - base thread pool
+
+package org.galemc.gale.executor.thread.pool;
@@ -5909,7 +5955,7 @@ index 0000000000000000000000000000000000000000..5554f23c4cca4e9f0be036e11c9e2c69
+ // There is no point in activating the thread because it is not waiting
+ return false;
+ }
+ if (lockWaitingFor != null && !lockWaitingFor.isLocked()) {
+ if (lockWaitingFor != null && !lockWaitingFor.isLocked() && lockWaitingFor.canBeSignalledFor) {
+ // Activating the thread would be useful because there is a lock that can be acquired
+ return true;
+ }
@@ -6010,7 +6056,7 @@ index 0000000000000000000000000000000000000000..5554f23c4cca4e9f0be036e11c9e2c69
+ int tierOrdinal = thread.baseThreadIndex > 0 ? (tier == null ? -1 : tier.ordinal) : BaseTaskQueueTier.SERVER.ordinal;
+ if (thread.isWaitingAndNeedsSignal()) {
+ var lockWaitingFor = thread.lockWaitingFor;
+ if (lockWaitingFor != null && !lockWaitingFor.isLocked()) {
+ if (lockWaitingFor != null && !lockWaitingFor.isLocked() && lockWaitingFor.canBeSignalledFor) {
+ threadsWaitingForUnlockedLockForTier[tierOrdinal].add(thread);
+ }
+ } else {
@@ -6160,7 +6206,7 @@ index 0000000000000000000000000000000000000000..5554f23c4cca4e9f0be036e11c9e2c69
+ */
+ var highestTierOfTaskOnStack = thread.highestTierOfTaskOnStack;
+ var highestTierOfTaskOnStackOrdinalOrLength = highestTierOfTaskOnStack == null ? BaseTaskQueueTier.length : highestTierOfTaskOnStack.ordinal;
+ boolean isThreadWaitingForAvailableYieldingLock = lockWaitingFor != null && !lockWaitingFor.isLocked();
+ boolean isThreadWaitingForAvailableYieldingLock = lockWaitingFor != null && !lockWaitingFor.isLocked() && lockWaitingFor.canBeSignalledFor;
+ if (isThreadWaitingForAvailableYieldingLock || highestTierOfTaskOnStack == null || highestTierOfTaskOnStack.ordinal >= tierI) {
+ boolean isBestChoice = false;
+ int yieldDepth = thread.yieldDepth;
@@ -6199,6 +6245,7 @@ index 0000000000000000000000000000000000000000..5554f23c4cca4e9f0be036e11c9e2c69
+ if (couldBeUsefullyActivatedForTasksOrLock(thread, lockWaitingFor, tierI == 0)) {
+ // Wake up the thread
+ if (thread.signal(thereAreTasks ? SignalReason.TASK : SignalReason.YIELDING_LOCK)) {
+ // Make sure no other threads are activated for the same unlocking event
+ // Do another update
+ madeChangesInLastIteration = true;
+ continue updateWhileNecessary;

View File

@@ -653,7 +653,7 @@ index 0000000000000000000000000000000000000000..9c36119b03ba9a20dc6994ef2d2704eb
+
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
index f0bf8cfc28107d3f2421d033e45f1a88f8d86d5b..3e293dbd4ea8336a6239df6ee60bff9ddd6a33d3 100644
index 76ca22e30b41e4a39ed215cfb03c9a793eefcc09..f142f726663686ac475f64a1ffcbac145c08f5ae 100644
--- a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
+++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
@@ -2,6 +2,8 @@
@@ -673,7 +673,7 @@ index f0bf8cfc28107d3f2421d033e45f1a88f8d86d5b..3e293dbd4ea8336a6239df6ee60bff9d
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.TimeUnit;
@@ -702,6 +705,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea
@@ -737,6 +740,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea
}
}