9
0
mirror of https://github.com/BX-Team/DivineMC.git synced 2025-12-28 11:19:19 +00:00

move patches to work

This commit is contained in:
NONPLAYT
2025-07-06 01:21:32 +03:00
parent c15046f9ae
commit dd0274ddc1
86 changed files with 17 additions and 2172 deletions

View File

@@ -1,191 +0,0 @@
package com.ishland.flowsched.executor;
import com.ishland.flowsched.structs.DynamicPriorityQueue;
import com.ishland.flowsched.util.Assertions;
import it.unimi.dsi.fastutil.objects.ReferenceArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
public class ExecutorManager {
public final DynamicPriorityQueue<Task> globalWorkQueue;
protected final ConcurrentMap<LockToken, FreeableTaskList> lockListeners = new ConcurrentHashMap<>();
protected final WorkerThread[] workerThreads;
final Object workerMonitor = new Object();
/**
* Creates a new executor manager.
*
* @param workerThreadCount the number of worker threads.
*/
public ExecutorManager(int workerThreadCount) {
this(workerThreadCount, thread -> { });
}
/**
* Creates a new executor manager.
*
* @param workerThreadCount the number of worker threads.
* @param threadInitializer the thread initializer.
*/
public ExecutorManager(int workerThreadCount, Consumer<Thread> threadInitializer) {
globalWorkQueue = new DynamicPriorityQueue<>();
workerThreads = new WorkerThread[workerThreadCount];
for (int i = 0; i < workerThreadCount; i++) {
final WorkerThread thread = new WorkerThread(this);
threadInitializer.accept(thread);
thread.start();
workerThreads[i] = thread;
}
}
/**
* Attempt to lock the given tokens.
* The caller should discard the task if this method returns false, as it reschedules the task.
*
* @return {@code true} if the lock is acquired, {@code false} otherwise.
*/
boolean tryLock(Task task) {
retry:
while (true) {
final FreeableTaskList listenerSet = new FreeableTaskList();
LockToken[] lockTokens = task.lockTokens();
for (int i = 0; i < lockTokens.length; i++) {
LockToken token = lockTokens[i];
final FreeableTaskList present = this.lockListeners.putIfAbsent(token, listenerSet);
if (present != null) {
for (int j = 0; j < i; j++) {
this.lockListeners.remove(lockTokens[j], listenerSet);
}
callListeners(listenerSet);
synchronized (present) {
if (present.freed) {
continue retry;
} else {
present.add(task);
}
}
return false;
}
}
return true;
}
}
/**
* Release the locks held by the given task.
*
* @param task the task.
*/
void releaseLocks(Task task) {
FreeableTaskList expectedListeners = null;
for (LockToken token : task.lockTokens()) {
final FreeableTaskList listeners = this.lockListeners.remove(token);
if (listeners != null) {
if (expectedListeners == null) {
expectedListeners = listeners;
} else {
Assertions.assertTrue(expectedListeners == listeners, "Inconsistent lock listeners");
}
} else {
throw new IllegalStateException("Lock token " + token + " is not locked");
}
}
if (expectedListeners != null) {
callListeners(expectedListeners); // synchronizes
}
}
private void callListeners(FreeableTaskList listeners) {
synchronized (listeners) {
listeners.freed = true;
if (listeners.isEmpty()) return;
for (Task listener : listeners) {
this.schedule0(listener);
}
}
this.wakeup();
}
/**
* Polls an executable task from the global work queue.
*
* @return the task, or {@code null} if no task is executable.
*/
Task pollExecutableTask() {
Task task;
while ((task = this.globalWorkQueue.dequeue()) != null) {
if (this.tryLock(task)) {
return task;
}
}
return null;
}
/**
* Shuts down the executor manager.
*/
public void shutdown() {
for (WorkerThread workerThread : workerThreads) {
workerThread.shutdown();
}
}
/**
* Schedules a task.
*
* @param task the task.
*/
public void schedule(Task task) {
schedule0(task);
wakeup();
}
private void schedule0(Task task) {
this.globalWorkQueue.enqueue(task, task.priority());
}
public void wakeup() { // Canvas - private -> public
synchronized (this.workerMonitor) {
this.workerMonitor.notify();
}
}
public boolean hasPendingTasks() {
return this.globalWorkQueue.size() != 0;
}
/**
* Schedules a runnable for execution with the given priority.
*
* @param runnable the runnable.
* @param priority the priority.
*/
public void schedule(Runnable runnable, int priority) {
this.schedule(new SimpleTask(runnable, priority));
}
/**
* Creates an executor that schedules runnables with the given priority.
*
* @param priority the priority.
* @return the executor.
*/
public Executor executor(int priority) {
return runnable -> this.schedule(runnable, priority);
}
/**
* Notifies the executor manager that the priority of the given task has changed.
*
* @param task the task.
*/
public void notifyPriorityChange(Task task) {
this.globalWorkQueue.changePriority(task, task.priority());
}
protected static class FreeableTaskList extends ReferenceArrayList<Task> { // Canvas - private -> protected
private boolean freed = false;
}
}

View File

@@ -1,3 +0,0 @@
package com.ishland.flowsched.executor;
public interface LockToken { }

View File

@@ -1,37 +0,0 @@
package com.ishland.flowsched.executor;
import java.util.Objects;
public class SimpleTask implements Task {
private final Runnable wrapped;
private final int priority;
public SimpleTask(Runnable wrapped, int priority) {
this.wrapped = Objects.requireNonNull(wrapped);
this.priority = priority;
}
@Override
public void run(Runnable releaseLocks) {
try {
wrapped.run();
} finally {
releaseLocks.run();
}
}
@Override
public void propagateException(Throwable t) {
t.printStackTrace();
}
@Override
public LockToken[] lockTokens() {
return new LockToken[0];
}
@Override
public int priority() {
return this.priority;
}
}

View File

@@ -1,11 +0,0 @@
package com.ishland.flowsched.executor;
public interface Task {
void run(Runnable releaseLocks);
void propagateException(Throwable t);
LockToken[] lockTokens();
int priority();
}

View File

@@ -1,84 +0,0 @@
package com.ishland.flowsched.executor;
import ca.spottedleaf.moonrise.common.util.TickThread;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkerThread extends TickThread {
private static final Logger LOGGER = LoggerFactory.getLogger("FlowSched Executor Worker Thread");
private final ExecutorManager executorManager;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public volatile boolean active = false;
public WorkerThread(ExecutorManager executorManager) {
super("null_worker");
this.executorManager = executorManager;
}
@Override
public void run() {
main_loop:
while (true) {
if (this.shutdown.get()) {
return;
}
active = true;
if (pollTasks()) {
continue;
}
synchronized (this.executorManager.workerMonitor) {
if (this.executorManager.hasPendingTasks()) continue main_loop;
try {
active = false;
this.executorManager.workerMonitor.wait();
} catch (InterruptedException ignored) {
}
}
}
}
private boolean pollTasks() {
final Task task = executorManager.pollExecutableTask();
try {
if (task != null) {
AtomicBoolean released = new AtomicBoolean(false);
try {
task.run(() -> {
if (released.compareAndSet(false, true)) {
executorManager.releaseLocks(task);
}
});
} catch (Throwable t) {
try {
if (released.compareAndSet(false, true)) {
executorManager.releaseLocks(task);
}
} catch (Throwable t1) {
t.addSuppressed(t1);
LOGGER.error("Exception thrown while releasing locks", t);
}
try {
task.propagateException(t);
} catch (Throwable t1) {
t.addSuppressed(t1);
LOGGER.error("Exception thrown while propagating exception", t);
}
}
return true;
}
return false;
} catch (Throwable t) {
LOGGER.error("Exception thrown while executing task", t);
return true;
}
}
public void shutdown() {
shutdown.set(true);
LockSupport.unpark(this);
}
}

View File

@@ -1,86 +0,0 @@
package com.ishland.flowsched.structs;
import ca.spottedleaf.moonrise.common.util.MoonriseConstants;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerArray;
@SuppressWarnings("unchecked")
public class DynamicPriorityQueue<E> {
public static final int MAX_PRIORITY = MoonriseConstants.MAX_VIEW_DISTANCE + 3;
private final AtomicIntegerArray taskCount;
public final ConcurrentLinkedQueue<E>[] priorities;
private final ConcurrentHashMap<E, Integer> priorityMap = new ConcurrentHashMap<>();
public DynamicPriorityQueue() {
this.taskCount = new AtomicIntegerArray(MAX_PRIORITY);
this.priorities = new ConcurrentLinkedQueue[MAX_PRIORITY];
for (int i = 0; i < (MAX_PRIORITY); i++) {
this.priorities[i] = new ConcurrentLinkedQueue<>();
}
}
public void enqueue(E element, int priority) {
if (this.priorityMap.putIfAbsent(element, priority) != null)
throw new IllegalArgumentException("Element already in queue");
this.priorities[priority].add(element);
this.taskCount.incrementAndGet(priority);
}
public boolean changePriority(E element, int newPriority) {
Integer currentPriority = this.priorityMap.get(element);
if (currentPriority == null || currentPriority == newPriority) {
return false;
}
int currentIndex = currentPriority;
boolean removedFromQueue = this.priorities[currentIndex].remove(element);
if (!removedFromQueue) {
return false;
}
this.taskCount.decrementAndGet(currentIndex);
final boolean changeSuccess = this.priorityMap.replace(element, currentPriority, newPriority);
if (!changeSuccess) {
return false;
}
this.priorities[newPriority].add(element);
this.taskCount.incrementAndGet(newPriority);
return true;
}
public E dequeue() {
for (int i = 0; i < this.priorities.length; i++) {
if (this.taskCount.get(i) == 0) continue;
E element = priorities[i].poll();
if (element != null) {
this.taskCount.decrementAndGet(i);
this.priorityMap.remove(element);
return element;
}
}
return null;
}
public boolean contains(E element) {
return priorityMap.containsKey(element);
}
public void remove(E element) {
Integer priority = this.priorityMap.remove(element);
if (priority == null) return;
boolean removed = this.priorities[priority].remove(element);
if (removed) this.taskCount.decrementAndGet(priority);
}
public int size() {
return priorityMap.size();
}
public boolean isEmpty() {
return size() == 0;
}
}

View File

@@ -1,27 +0,0 @@
package com.ishland.flowsched.util;
public final class Assertions {
public static void assertTrue(boolean value, String message) {
if (!value) {
final AssertionError error = new AssertionError(message);
error.printStackTrace();
throw error;
}
}
public static void assertTrue(boolean state, String format, Object... args) {
if (!state) {
final AssertionError error = new AssertionError(String.format(format, args));
error.printStackTrace();
throw error;
}
}
public static void assertTrue(boolean value) {
if (!value) {
final AssertionError error = new AssertionError();
error.printStackTrace();
throw error;
}
}
}