9
0
mirror of https://github.com/Winds-Studio/Leaf.git synced 2025-12-25 18:09:17 +00:00

fix async tracker deadlock

This commit is contained in:
hayanesuru
2025-07-17 01:47:30 +09:00
parent cecd8d751a
commit b079543135
2 changed files with 14 additions and 26 deletions

View File

@@ -5,13 +5,11 @@ import org.dreeam.leaf.util.queue.MpmcQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
public final class FixedThreadExecutor {
private final Thread[] threads;
private final Worker[] workers;
public final MpmcQueue<Runnable> channel;
public final Object sync;
private static volatile boolean SHUTDOWN = false;
public FixedThreadExecutor(int numThreads, int queue, String prefix) {
@@ -19,16 +17,15 @@ public final class FixedThreadExecutor {
throw new IllegalArgumentException();
}
this.threads = new Thread[numThreads];
this.workers = new Worker[numThreads];
this.channel = new MpmcQueue<>(Runnable.class, queue);
this.sync = new Object();
for (int i = 0; i < numThreads; i++) {
workers[i] = new Worker(channel, new AtomicBoolean(false));
threads[i] = Thread.ofPlatform()
.uncaughtExceptionHandler(Util::onThreadException)
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.name(prefix + " - " + i)
.start(workers[i]);
.start(new Worker(channel, sync));
}
}
@@ -45,18 +42,15 @@ public final class FixedThreadExecutor {
}
public void unpack() {
int size = Math.min(Math.max(1, channel.length()), threads.length);
for (int i = 0; i < size; i++) {
if (workers[i].parked.get()) {
LockSupport.unpark(threads[i]);
}
synchronized (sync) {
sync.notifyAll();
}
}
public void shutdown() {
SHUTDOWN = true;
for (final Thread worker : threads) {
LockSupport.unpark(worker);
synchronized (sync) {
sync.notifyAll();
}
}
@@ -75,23 +69,21 @@ public final class FixedThreadExecutor {
}
}
private record Worker(MpmcQueue<Runnable> channel, AtomicBoolean parked) implements Runnable {
private record Worker(MpmcQueue<Runnable> channel, Object sync) implements Runnable {
@Override
public void run() {
while (true) {
final Runnable task = channel.recv();
if (task != null) {
parked.set(false);
task.run();
} else if (SHUTDOWN) {
break;
} else if (channel.isEmpty()) {
Thread.yield();
if (parked.compareAndSet(false, true)) {
if (channel.isEmpty()) {
LockSupport.park();
} else {
parked.set(false);
} else if (!channel.isEmpty()) {
synchronized (sync) {
try {
sync.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@@ -66,10 +66,6 @@ public final class AsyncTracker {
}
public static void onTickEnd(MinecraftServer server) {
if (!TRACKER_EXECUTOR.channel.isEmpty()) {
// not likely
TRACKER_EXECUTOR.unpack();
}
for (ServerLevel world : server.getAllLevels()) {
Future<TrackerCtx>[] task = world.trackerTask;
if (task != null) {