mirror of
https://github.com/Dreeam-qwq/Gale.git
synced 2025-12-27 02:29:11 +00:00
Use spin locks with easier syntax
This commit is contained in:
@@ -8,10 +8,10 @@ Gale - https://galemc.org
|
||||
|
||||
diff --git a/src/main/java/org/galemc/gale/concurrent/Mutex.java b/src/main/java/org/galemc/gale/concurrent/Mutex.java
|
||||
new file mode 100644
|
||||
index 0000000000000000000000000000000000000000..65ec8cf910575dfa4c5024ec69b3be1ef2634722
|
||||
index 0000000000000000000000000000000000000000..e8ab6cd25909a848c1826824a0f04dc593a26be7
|
||||
--- /dev/null
|
||||
+++ b/src/main/java/org/galemc/gale/concurrent/Mutex.java
|
||||
@@ -0,0 +1,76 @@
|
||||
@@ -0,0 +1,121 @@
|
||||
+// Gale - mutex utility
|
||||
+
|
||||
+package org.galemc.gale.concurrent;
|
||||
@@ -30,11 +30,13 @@ index 0000000000000000000000000000000000000000..65ec8cf910575dfa4c5024ec69b3be1e
|
||||
+ * {@link Lock#tryLock} and {@link Lock#unlock} methods are simply deferred to their usual mutex versions,
|
||||
+ * respectively {@link #acquireUninterruptibly}, {@link #acquire}, {@link #tryAcquire} and
|
||||
+ * {@link #release}. The {@link Lock#newCondition} method does not have a default implementation.
|
||||
+ * <br>
|
||||
+ * This interface extends {@link AutoCloseable}, where {@link #close()} calls {@link #release()}.
|
||||
+ *
|
||||
+ * @author Martijn Muijsers
|
||||
+ */
|
||||
+@AnyThreadSafe
|
||||
+public interface Mutex extends Lock {
|
||||
+public interface Mutex extends Lock, AutoCloseable {
|
||||
+
|
||||
+ void acquireUninterruptibly();
|
||||
+
|
||||
@@ -72,6 +74,49 @@ index 0000000000000000000000000000000000000000..65ec8cf910575dfa4c5024ec69b3be1e
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Acquires this mutex in the style of a spin lock: repeatedly calls {@link #tryAcquire()} until successful.
|
||||
+ */
|
||||
+ default void spinLock() {
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!this.tryAcquire());
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Acquires this mutex with {@link #acquire()}, then returns this instance.
|
||||
+ * <br>
|
||||
+ * This can be used in the following way:
|
||||
+ * <p>
|
||||
+ * <code>
|
||||
+ * try (mutex.withLock()) {<br>
|
||||
+ * // code<br>
|
||||
+ * }
|
||||
+ * </code>
|
||||
+ * </p>
|
||||
+ *
|
||||
+ * @return This {@link Mutex}, which implements {@link AutoCloseable}.
|
||||
+ */
|
||||
+ default Mutex withLock() throws InterruptedException {
|
||||
+ this.acquire();
|
||||
+ return this;
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Acquires this mutex with {@link #spinLock()}, then returns this instance.
|
||||
+ *
|
||||
+ * @return This {@link Mutex}, which implements {@link AutoCloseable}.
|
||||
+ * @see #withLock()
|
||||
+ */
|
||||
+ default Mutex withSpinLock() {
|
||||
+ this.spinLock();
|
||||
+ return this;
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ default void close() {
|
||||
+ this.release();
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Instantiates a new {@link Mutex}, with the default implementation
|
||||
+ * that should be geared towards performance.
|
||||
+ */
|
||||
|
||||
@@ -1851,18 +1851,18 @@ index 78f53ee557276de85f0431ebcb146445b1f4fb92..6176867eea06c53882dcaacfbde0334b
|
||||
|
||||
return ret;
|
||||
diff --git a/src/main/java/org/galemc/gale/concurrent/Mutex.java b/src/main/java/org/galemc/gale/concurrent/Mutex.java
|
||||
index 65ec8cf910575dfa4c5024ec69b3be1ef2634722..174c248aa706f6b5f3e248cb7604b44a4d508967 100644
|
||||
index e8ab6cd25909a848c1826824a0f04dc593a26be7..fdad4a7a15d4af427e1441dcc3d837faf1189f85 100644
|
||||
--- a/src/main/java/org/galemc/gale/concurrent/Mutex.java
|
||||
+++ b/src/main/java/org/galemc/gale/concurrent/Mutex.java
|
||||
@@ -17,7 +17,7 @@ import java.util.concurrent.locks.Lock;
|
||||
* respectively {@link #acquireUninterruptibly}, {@link #acquire}, {@link #tryAcquire} and
|
||||
* {@link #release}. The {@link Lock#newCondition} method does not have a default implementation.
|
||||
@@ -19,7 +19,7 @@ import java.util.concurrent.locks.Lock;
|
||||
* <br>
|
||||
* This interface extends {@link AutoCloseable}, where {@link #close()} calls {@link #release()}.
|
||||
*
|
||||
- * @author Martijn Muijsers
|
||||
+ * @author Martijn Muijsers under AGPL-3.0
|
||||
*/
|
||||
@AnyThreadSafe
|
||||
public interface Mutex extends Lock {
|
||||
public interface Mutex extends Lock, AutoCloseable {
|
||||
diff --git a/src/main/java/org/galemc/gale/concurrent/SemaphoreMutex.java b/src/main/java/org/galemc/gale/concurrent/SemaphoreMutex.java
|
||||
index 2e31501d26b141729c80975e97a23b09653ba3bf..5a454236073dd75ed36d058c0f033c4aada403e3 100644
|
||||
--- a/src/main/java/org/galemc/gale/concurrent/SemaphoreMutex.java
|
||||
|
||||
@@ -7,7 +7,7 @@ License: AGPL-3.0 (https://www.gnu.org/licenses/agpl-3.0.html)
|
||||
Gale - https://galemc.org
|
||||
|
||||
diff --git a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java
|
||||
index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9b2567fe0 100644
|
||||
index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..2fa56c06ec16817d5e35d5283b8e5a0d0e01d643 100644
|
||||
--- a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java
|
||||
+++ b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java
|
||||
@@ -2,11 +2,17 @@ package ca.spottedleaf.concurrentutil.executor.standard;
|
||||
@@ -32,7 +32,7 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@@ -14,8 +20,9 @@ public final class PrioritisedThreadPool {
|
||||
@@ -14,13 +20,16 @@ public final class PrioritisedThreadPool {
|
||||
|
||||
private static final Logger LOGGER = LogUtils.getLogger();
|
||||
|
||||
@@ -44,7 +44,14 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
protected final String name;
|
||||
protected final long queueMaxHoldTime;
|
||||
|
||||
@@ -46,37 +53,14 @@ public final class PrioritisedThreadPool {
|
||||
protected final ReferenceOpenHashSet<PrioritisedPoolExecutorImpl> nonShutdownQueues = new ReferenceOpenHashSet<>();
|
||||
+ protected final Mutex nonShutdownQueuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
protected final ReferenceOpenHashSet<PrioritisedPoolExecutorImpl> activeQueues = new ReferenceOpenHashSet<>();
|
||||
+ protected final Mutex activeQueuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
|
||||
protected boolean shutdown;
|
||||
|
||||
@@ -46,41 +55,18 @@ public final class PrioritisedThreadPool {
|
||||
}
|
||||
this.name = name;
|
||||
this.queueMaxHoldTime = queueHoldTime;
|
||||
@@ -67,15 +74,16 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
- // now the thread can start
|
||||
- this.threads[i].start();
|
||||
- }
|
||||
- }
|
||||
-
|
||||
- public Thread[] getThreads() {
|
||||
- return Arrays.copyOf(this.threads, this.threads.length, Thread[].class);
|
||||
}
|
||||
|
||||
- public Thread[] getThreads() {
|
||||
- return Arrays.copyOf(this.threads, this.threads.length, Thread[].class);
|
||||
- }
|
||||
-
|
||||
- public PrioritisedPoolExecutor createExecutor(final String name, final int parallelism) {
|
||||
- synchronized (this.nonShutdownQueues) {
|
||||
+ public PrioritisedPoolExecutor createExecutor(final String name, boolean supportsParallelism) { // Gale - base thread pool - chunk worker task queue
|
||||
synchronized (this.nonShutdownQueues) {
|
||||
+ try (var ignored = nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
if (this.shutdown) {
|
||||
throw new IllegalStateException("Queue is shutdown: " + this.toString());
|
||||
}
|
||||
@@ -84,7 +92,12 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
|
||||
this.nonShutdownQueues.add(ret);
|
||||
|
||||
@@ -88,136 +72,24 @@ public final class PrioritisedThreadPool {
|
||||
- synchronized (this.activeQueues) {
|
||||
+ try (var ignored2 = this.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
this.activeQueues.add(ret);
|
||||
}
|
||||
|
||||
@@ -88,136 +74,24 @@ public final class PrioritisedThreadPool {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,7 +244,7 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
}
|
||||
|
||||
private boolean isAlertedHighPriority() {
|
||||
@@ -225,21 +97,23 @@ public final class PrioritisedThreadPool {
|
||||
@@ -225,21 +99,19 @@ public final class PrioritisedThreadPool {
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -252,15 +265,11 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
final PrioritisedPoolExecutorImpl queue;
|
||||
// select queue
|
||||
- synchronized (queues) {
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
queue = queues.pollFirst();
|
||||
if (queue == null) {
|
||||
// no tasks to execute
|
||||
@@ -249,7 +123,7 @@ public final class PrioritisedThreadPool {
|
||||
@@ -249,7 +121,7 @@ public final class PrioritisedThreadPool {
|
||||
queue.schedulingId = ++pool.schedulingIdGenerator;
|
||||
// we own this queue now, so increment the executor count
|
||||
// do we also need to push this queue up for grabs for another executor?
|
||||
@@ -269,18 +278,7 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
// re-add to queues
|
||||
// it's very important this is done in the same synchronised block for polling, as this prevents
|
||||
// us from possibly later adding a queue that should not exist in the set
|
||||
@@ -261,16 +135,17 @@ public final class PrioritisedThreadPool {
|
||||
// note: we cannot drain entries from the queue while holding this lock, as it will cause deadlock
|
||||
// the queue addition holds the per-queue lock first then acquires the lock we have now, but if we
|
||||
// try to poll now we don't hold the per queue lock but we do hold the global lock...
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ } finally {
|
||||
+ queuesLock.release();
|
||||
}
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
|
||||
// parse tasks as long as we are allowed
|
||||
final long start = System.nanoTime();
|
||||
@@ -268,9 +140,6 @@ public final class PrioritisedThreadPool {
|
||||
final long deadline = start + pool.queueMaxHoldTime;
|
||||
do {
|
||||
try {
|
||||
@@ -290,7 +288,7 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
if (!queue.executeTask()) {
|
||||
// no more tasks, try next queue
|
||||
break;
|
||||
@@ -279,11 +154,15 @@ public final class PrioritisedThreadPool {
|
||||
@@ -279,11 +148,11 @@ public final class PrioritisedThreadPool {
|
||||
} catch (final ThreadDeath death) {
|
||||
throw death; // goodbye world...
|
||||
} catch (final Throwable throwable) {
|
||||
@@ -301,24 +299,11 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
+ } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline && BaseThreadActivation.tierInExcessOrdinal > BaseTaskQueueTier.LOW_PRIORITY_ASYNC.ordinal); // Gale - base thread pool - chunk worker task queue
|
||||
|
||||
- synchronized (queues) {
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
// decrement executors, we are no longer executing
|
||||
if (queue.isQueued) {
|
||||
queues.remove(queue);
|
||||
@@ -301,11 +180,21 @@ public final class PrioritisedThreadPool {
|
||||
queues.add(queue);
|
||||
queue.isQueued = true;
|
||||
}
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ } finally {
|
||||
+ queuesLock.release();
|
||||
}
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
}
|
||||
@@ -306,6 +175,12 @@ public final class PrioritisedThreadPool {
|
||||
|
||||
return ret;
|
||||
}
|
||||
@@ -331,7 +316,7 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
}
|
||||
|
||||
public interface PrioritisedPoolExecutor extends PrioritisedExecutor {
|
||||
@@ -322,7 +211,7 @@ public final class PrioritisedThreadPool {
|
||||
@@ -322,7 +197,7 @@ public final class PrioritisedThreadPool {
|
||||
public boolean isActive();
|
||||
}
|
||||
|
||||
@@ -340,33 +325,32 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
|
||||
protected final PrioritisedThreadPool pool;
|
||||
protected final long[] priorityCounts = new long[Priority.TOTAL_SCHEDULABLE_PRIORITIES];
|
||||
@@ -369,7 +258,12 @@ public final class PrioritisedThreadPool {
|
||||
@@ -369,7 +244,10 @@ public final class PrioritisedThreadPool {
|
||||
public void halt() {
|
||||
final PrioritisedThreadPool pool = this.pool;
|
||||
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
|
||||
- synchronized (queues) {
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ final Mutex queuesLock = pool.queuesLock;
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ try (var ignored = queuesLock.withSpinLock()) {
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
if (this.isHalted) {
|
||||
return;
|
||||
}
|
||||
@@ -378,7 +272,11 @@ public final class PrioritisedThreadPool {
|
||||
queues.remove(this);
|
||||
@@ -379,10 +257,10 @@ public final class PrioritisedThreadPool {
|
||||
this.isQueued = false;
|
||||
}
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ } finally {
|
||||
+ queuesLock.release();
|
||||
}
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
synchronized (pool.nonShutdownQueues) {
|
||||
- synchronized (pool.nonShutdownQueues) {
|
||||
+ try (var ignored = pool.nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
pool.nonShutdownQueues.remove(this);
|
||||
}
|
||||
@@ -391,8 +289,13 @@ public final class PrioritisedThreadPool {
|
||||
- synchronized (pool.activeQueues) {
|
||||
+ try (var ignored = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
pool.activeQueues.remove(this);
|
||||
}
|
||||
}
|
||||
@@ -391,12 +269,15 @@ public final class PrioritisedThreadPool {
|
||||
public boolean isActive() {
|
||||
final PrioritisedThreadPool pool = this.pool;
|
||||
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
|
||||
@@ -374,26 +358,17 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
+ final Mutex queuesLock = pool.queuesLock;
|
||||
|
||||
- synchronized (queues) {
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ try (var ignored = queuesLock.withSpinLock()) {
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
if (this.concurrentExecutors != 0) {
|
||||
return true;
|
||||
}
|
||||
@@ -401,7 +304,11 @@ public final class PrioritisedThreadPool {
|
||||
- synchronized (pool.activeQueues) {
|
||||
+ try (var ignored2 = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
if (pool.activeQueues.contains(this)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ } finally {
|
||||
+ queuesLock.release();
|
||||
}
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -469,8 +376,13 @@ public final class PrioritisedThreadPool {
|
||||
@@ -469,8 +350,11 @@ public final class PrioritisedThreadPool {
|
||||
|
||||
final PrioritisedThreadPool pool = this.pool;
|
||||
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
|
||||
@@ -401,14 +376,12 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
+ final Mutex queuesLock = pool.queuesLock;
|
||||
|
||||
- synchronized (queues) {
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ try (var ignored = queuesLock.withSpinLock()) {
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
if (!this.isQueued) {
|
||||
// see if we need to be queued
|
||||
if (newPriority != null) {
|
||||
@@ -478,7 +390,7 @@ public final class PrioritisedThreadPool {
|
||||
@@ -478,7 +362,7 @@ public final class PrioritisedThreadPool {
|
||||
this.schedulingId = ++pool.schedulingIdGenerator;
|
||||
}
|
||||
this.scheduledPriority = newPriority; // must be updated before queue add
|
||||
@@ -417,19 +390,14 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
shouldNotifyHighPriority = newPriority.isHigherOrEqualPriority(Priority.HIGH);
|
||||
queues.add(this);
|
||||
this.isQueued = true;
|
||||
@@ -508,7 +420,11 @@ public final class PrioritisedThreadPool {
|
||||
} else {
|
||||
executorsWanted = 0;
|
||||
}
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ } finally {
|
||||
+ queuesLock.release();
|
||||
@@ -511,19 +395,20 @@ public final class PrioritisedThreadPool {
|
||||
}
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
|
||||
if (newPriority == null && shutdown) {
|
||||
synchronized (pool.activeQueues) {
|
||||
@@ -517,13 +433,14 @@ public final class PrioritisedThreadPool {
|
||||
- synchronized (pool.activeQueues) {
|
||||
+ try (var ignored = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
pool.activeQueues.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
// Wake up the number of executors we want
|
||||
@@ -449,7 +417,13 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -543,9 +460,14 @@ public final class PrioritisedThreadPool {
|
||||
@@ -538,17 +423,18 @@ public final class PrioritisedThreadPool {
|
||||
final PrioritisedThreadPool pool = this.pool;
|
||||
|
||||
// remove from active queues
|
||||
- synchronized (pool.nonShutdownQueues) {
|
||||
+ try (var ignored = pool.nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
pool.nonShutdownQueues.remove(this);
|
||||
}
|
||||
|
||||
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
|
||||
@@ -457,26 +431,14 @@ index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9
|
||||
|
||||
// try and shift around our priority
|
||||
- synchronized (queues) {
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
if (this.scheduledPriority == null) {
|
||||
// no tasks are queued, ensure we aren't in activeQueues
|
||||
synchronized (pool.activeQueues) {
|
||||
@@ -571,7 +493,11 @@ public final class PrioritisedThreadPool {
|
||||
} else {
|
||||
this.scheduledPriority = Priority.HIGHEST;
|
||||
}
|
||||
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
+ } finally {
|
||||
+ queuesLock.release();
|
||||
}
|
||||
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
- synchronized (pool.activeQueues) {
|
||||
+ try (var ignored2 = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
|
||||
pool.activeQueues.remove(this);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
diff --git a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java
|
||||
index f5c15d40094c2ddc6220b0595597d12103fcf425..79ef41d2bb30beee2355d1de3dc99c9e00d929d5 100644
|
||||
--- a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java
|
||||
@@ -532,6 +494,18 @@ index 0019e5eefc4b638526a75dd3706a54033dd9b811..7ed820d2483bf6741a355b062f062a04
|
||||
this.saveAllChunks(false, true, false, true); // Paper - rewrite chunk system - move closing into here
|
||||
|
||||
this.isSaving = false;
|
||||
diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java
|
||||
index 1cd17795c8f8a7a0d111123e662904522a6c8819..a6db8e2ca224410cc36080dbd5e8fe0e34ecabc0 100644
|
||||
--- a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java
|
||||
+++ b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java
|
||||
@@ -9,7 +9,6 @@ import org.galemc.gale.executor.queue.BaseTaskQueues;
|
||||
import org.galemc.gale.executor.thread.BaseThread;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
-
|
||||
/**
|
||||
* This class is a copy of {@link PrioritisedQueueExecutorThread}, with the notable difference
|
||||
* that it does not extend {@link Thread}, but may be instantiated on its own, as an agent representing
|
||||
diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
|
||||
index 9a908566ccdda9ca77e0f9236f674f17f79e9c40..dc006d9940ef8114a3a3e4860fbc1da0f7c2ee60 100644
|
||||
--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
|
||||
@@ -571,10 +545,10 @@ index ed3ccf2e64539363a7be2d507c68c40b5913f75c..a12250e5aaed02995b7bf09a8018a93f
|
||||
}
|
||||
diff --git a/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java
|
||||
new file mode 100644
|
||||
index 0000000000000000000000000000000000000000..0e2e11b883d8910e97c5700f7f6a7d1b545a6164
|
||||
index 0000000000000000000000000000000000000000..c82a83a310b0dffd2dc77256b128d0e58d6df691
|
||||
--- /dev/null
|
||||
+++ b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java
|
||||
@@ -0,0 +1,108 @@
|
||||
@@ -0,0 +1,104 @@
|
||||
+// Gale - base thread pool - chunk worker task queue
|
||||
+
|
||||
+package org.galemc.gale.executor.queue;
|
||||
@@ -620,16 +594,12 @@ index 0000000000000000000000000000000000000000..0e2e11b883d8910e97c5700f7f6a7d1b
|
||||
+ if (workerThreads == null) {
|
||||
+ return false;
|
||||
+ }
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!workerThreads.queuesLock.tryAcquire());
|
||||
+ try {
|
||||
+ try (var ignored = workerThreads.queuesLock.withSpinLock()) {
|
||||
+ for (var queue : workerThreads.queues) {
|
||||
+ if (queue.hasScheduledUncompletedTasksVolatile()) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ }
|
||||
+ } finally {
|
||||
+ workerThreads.queuesLock.release();
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
|
||||
@@ -7,7 +7,7 @@ License: AGPL-3.0 (https://www.gnu.org/licenses/agpl-3.0.html)
|
||||
Gale - https://galemc.org
|
||||
|
||||
diff --git a/src/main/java/com/destroystokyo/paper/util/pooled/PooledObjects.java b/src/main/java/com/destroystokyo/paper/util/pooled/PooledObjects.java
|
||||
index a743703502cea333bd4231b6557de50e8eaf81eb..0566757ed94cb2c41ace660ef71c8f99b0335032 100644
|
||||
index a743703502cea333bd4231b6557de50e8eaf81eb..5531c681b3547c32f4fc8ec86bb722e1ee653826 100644
|
||||
--- a/src/main/java/com/destroystokyo/paper/util/pooled/PooledObjects.java
|
||||
+++ b/src/main/java/com/destroystokyo/paper/util/pooled/PooledObjects.java
|
||||
@@ -2,11 +2,18 @@ package com.destroystokyo.paper.util.pooled;
|
||||
@@ -37,41 +37,21 @@ index a743703502cea333bd4231b6557de50e8eaf81eb..0566757ed94cb2c41ace660ef71c8f99
|
||||
|
||||
public PooledObjects(final Supplier<E> creator, int maxPoolSize) {
|
||||
this(creator, maxPoolSize, null);
|
||||
@@ -66,8 +74,16 @@ public final class PooledObjects<E> {
|
||||
@@ -66,7 +74,7 @@ public final class PooledObjects<E> {
|
||||
|
||||
public final E acquire() {
|
||||
E value;
|
||||
- synchronized (queue) {
|
||||
+ // Gale start - non-blocking PooledObjects
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!this.queueLock.tryAcquire());
|
||||
+ try {
|
||||
+ // Gale end - non-blocking PooledObjects
|
||||
+ try (var ignored = this.queueLock.withSpinLock()) { // Gale - non-blocking PooledObjects
|
||||
value = this.queue.pollLast();
|
||||
+ // Gale start - non-blocking PooledObjects
|
||||
+ } finally {
|
||||
+ this.queueLock.release();
|
||||
+ // Gale end - non-blocking PooledObjects
|
||||
}
|
||||
return value != null ? value : this.creator.get();
|
||||
}
|
||||
@@ -76,10 +92,18 @@ public final class PooledObjects<E> {
|
||||
@@ -76,7 +84,7 @@ public final class PooledObjects<E> {
|
||||
if (this.releaser != null) {
|
||||
this.releaser.accept(value);
|
||||
}
|
||||
- synchronized (this.queue) {
|
||||
+ // Gale start - non-blocking PooledObjects
|
||||
+ //noinspection StatementWithEmptyBody
|
||||
+ while (!this.queueLock.tryAcquire());
|
||||
+ try {
|
||||
+ // Gale end - non-blocking PooledObjects
|
||||
+ try (var ignored = this.queueLock.withSpinLock()) { // Gale - non-blocking PooledObjects
|
||||
if (queue.size() < this.maxPoolSize) {
|
||||
this.queue.addLast(value);
|
||||
}
|
||||
+ // Gale start - non-blocking PooledObjects
|
||||
+ } finally {
|
||||
+ this.queueLock.release();
|
||||
+ // Gale end - non-blocking PooledObjects
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user