mirror of
https://github.com/BX-Team/DivineMC.git
synced 2025-12-28 11:19:19 +00:00
rewrite chunk system
This commit is contained in:
@@ -0,0 +1,191 @@
|
||||
package com.ishland.flowsched.executor;
|
||||
|
||||
import com.ishland.flowsched.structs.DynamicPriorityQueue;
|
||||
import com.ishland.flowsched.util.Assertions;
|
||||
import it.unimi.dsi.fastutil.objects.ReferenceArrayList;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class ExecutorManager {
|
||||
public final DynamicPriorityQueue<Task> globalWorkQueue;
|
||||
protected final ConcurrentMap<LockToken, FreeableTaskList> lockListeners = new ConcurrentHashMap<>();
|
||||
protected final WorkerThread[] workerThreads;
|
||||
final Object workerMonitor = new Object();
|
||||
|
||||
/**
|
||||
* Creates a new executor manager.
|
||||
*
|
||||
* @param workerThreadCount the number of worker threads.
|
||||
*/
|
||||
public ExecutorManager(int workerThreadCount) {
|
||||
this(workerThreadCount, thread -> { });
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new executor manager.
|
||||
*
|
||||
* @param workerThreadCount the number of worker threads.
|
||||
* @param threadInitializer the thread initializer.
|
||||
*/
|
||||
public ExecutorManager(int workerThreadCount, Consumer<Thread> threadInitializer) {
|
||||
globalWorkQueue = new DynamicPriorityQueue<>();
|
||||
workerThreads = new WorkerThread[workerThreadCount];
|
||||
for (int i = 0; i < workerThreadCount; i++) {
|
||||
final WorkerThread thread = new WorkerThread(this);
|
||||
threadInitializer.accept(thread);
|
||||
thread.start();
|
||||
workerThreads[i] = thread;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to lock the given tokens.
|
||||
* The caller should discard the task if this method returns false, as it reschedules the task.
|
||||
*
|
||||
* @return {@code true} if the lock is acquired, {@code false} otherwise.
|
||||
*/
|
||||
boolean tryLock(Task task) {
|
||||
retry:
|
||||
while (true) {
|
||||
final FreeableTaskList listenerSet = new FreeableTaskList();
|
||||
LockToken[] lockTokens = task.lockTokens();
|
||||
for (int i = 0; i < lockTokens.length; i++) {
|
||||
LockToken token = lockTokens[i];
|
||||
final FreeableTaskList present = this.lockListeners.putIfAbsent(token, listenerSet);
|
||||
if (present != null) {
|
||||
for (int j = 0; j < i; j++) {
|
||||
this.lockListeners.remove(lockTokens[j], listenerSet);
|
||||
}
|
||||
callListeners(listenerSet);
|
||||
synchronized (present) {
|
||||
if (present.freed) {
|
||||
continue retry;
|
||||
} else {
|
||||
present.add(task);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the locks held by the given task.
|
||||
*
|
||||
* @param task the task.
|
||||
*/
|
||||
void releaseLocks(Task task) {
|
||||
FreeableTaskList expectedListeners = null;
|
||||
for (LockToken token : task.lockTokens()) {
|
||||
final FreeableTaskList listeners = this.lockListeners.remove(token);
|
||||
if (listeners != null) {
|
||||
if (expectedListeners == null) {
|
||||
expectedListeners = listeners;
|
||||
} else {
|
||||
Assertions.assertTrue(expectedListeners == listeners, "Inconsistent lock listeners");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("Lock token " + token + " is not locked");
|
||||
}
|
||||
}
|
||||
if (expectedListeners != null) {
|
||||
callListeners(expectedListeners); // synchronizes
|
||||
}
|
||||
}
|
||||
|
||||
private void callListeners(FreeableTaskList listeners) {
|
||||
synchronized (listeners) {
|
||||
listeners.freed = true;
|
||||
if (listeners.isEmpty()) return;
|
||||
for (Task listener : listeners) {
|
||||
this.schedule0(listener);
|
||||
}
|
||||
}
|
||||
this.wakeup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Polls an executable task from the global work queue.
|
||||
*
|
||||
* @return the task, or {@code null} if no task is executable.
|
||||
*/
|
||||
Task pollExecutableTask() {
|
||||
Task task;
|
||||
while ((task = this.globalWorkQueue.dequeue()) != null) {
|
||||
if (this.tryLock(task)) {
|
||||
return task;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the executor manager.
|
||||
*/
|
||||
public void shutdown() {
|
||||
for (WorkerThread workerThread : workerThreads) {
|
||||
workerThread.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task.
|
||||
*
|
||||
* @param task the task.
|
||||
*/
|
||||
public void schedule(Task task) {
|
||||
schedule0(task);
|
||||
wakeup();
|
||||
}
|
||||
|
||||
private void schedule0(Task task) {
|
||||
this.globalWorkQueue.enqueue(task, task.priority());
|
||||
}
|
||||
|
||||
public void wakeup() { // Canvas - private -> public
|
||||
synchronized (this.workerMonitor) {
|
||||
this.workerMonitor.notify();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasPendingTasks() {
|
||||
return this.globalWorkQueue.size() != 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a runnable for execution with the given priority.
|
||||
*
|
||||
* @param runnable the runnable.
|
||||
* @param priority the priority.
|
||||
*/
|
||||
public void schedule(Runnable runnable, int priority) {
|
||||
this.schedule(new SimpleTask(runnable, priority));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an executor that schedules runnables with the given priority.
|
||||
*
|
||||
* @param priority the priority.
|
||||
* @return the executor.
|
||||
*/
|
||||
public Executor executor(int priority) {
|
||||
return runnable -> this.schedule(runnable, priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies the executor manager that the priority of the given task has changed.
|
||||
*
|
||||
* @param task the task.
|
||||
*/
|
||||
public void notifyPriorityChange(Task task) {
|
||||
this.globalWorkQueue.changePriority(task, task.priority());
|
||||
}
|
||||
|
||||
protected static class FreeableTaskList extends ReferenceArrayList<Task> { // Canvas - private -> protected
|
||||
private boolean freed = false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
package com.ishland.flowsched.executor;
|
||||
|
||||
public interface LockToken { }
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.ishland.flowsched.executor;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class SimpleTask implements Task {
|
||||
private final Runnable wrapped;
|
||||
private final int priority;
|
||||
|
||||
public SimpleTask(Runnable wrapped, int priority) {
|
||||
this.wrapped = Objects.requireNonNull(wrapped);
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(Runnable releaseLocks) {
|
||||
try {
|
||||
wrapped.run();
|
||||
} finally {
|
||||
releaseLocks.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void propagateException(Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LockToken[] lockTokens() {
|
||||
return new LockToken[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public int priority() {
|
||||
return this.priority;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.ishland.flowsched.executor;
|
||||
|
||||
public interface Task {
|
||||
void run(Runnable releaseLocks);
|
||||
|
||||
void propagateException(Throwable t);
|
||||
|
||||
LockToken[] lockTokens();
|
||||
|
||||
int priority();
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
package com.ishland.flowsched.executor;
|
||||
|
||||
import ca.spottedleaf.moonrise.common.util.TickThread;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class WorkerThread extends TickThread {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger("FlowSched Executor Worker Thread");
|
||||
|
||||
private final ExecutorManager executorManager;
|
||||
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||
public volatile boolean active = false;
|
||||
|
||||
public WorkerThread(ExecutorManager executorManager) {
|
||||
super("null_worker");
|
||||
this.executorManager = executorManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
main_loop:
|
||||
while (true) {
|
||||
if (this.shutdown.get()) {
|
||||
return;
|
||||
}
|
||||
active = true;
|
||||
if (pollTasks()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
synchronized (this.executorManager.workerMonitor) {
|
||||
if (this.executorManager.hasPendingTasks()) continue main_loop;
|
||||
try {
|
||||
active = false;
|
||||
this.executorManager.workerMonitor.wait();
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean pollTasks() {
|
||||
final Task task = executorManager.pollExecutableTask();
|
||||
try {
|
||||
if (task != null) {
|
||||
AtomicBoolean released = new AtomicBoolean(false);
|
||||
try {
|
||||
task.run(() -> {
|
||||
if (released.compareAndSet(false, true)) {
|
||||
executorManager.releaseLocks(task);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
try {
|
||||
if (released.compareAndSet(false, true)) {
|
||||
executorManager.releaseLocks(task);
|
||||
}
|
||||
} catch (Throwable t1) {
|
||||
t.addSuppressed(t1);
|
||||
LOGGER.error("Exception thrown while releasing locks", t);
|
||||
}
|
||||
try {
|
||||
task.propagateException(t);
|
||||
} catch (Throwable t1) {
|
||||
t.addSuppressed(t1);
|
||||
LOGGER.error("Exception thrown while propagating exception", t);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
} catch (Throwable t) {
|
||||
LOGGER.error("Exception thrown while executing task", t);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
shutdown.set(true);
|
||||
LockSupport.unpark(this);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package com.ishland.flowsched.structs;
|
||||
|
||||
import org.bxteam.divinemc.server.chunk.PriorityHandler;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class DynamicPriorityQueue<E> {
|
||||
private final AtomicIntegerArray taskCount;
|
||||
public final ConcurrentLinkedQueue<E>[] priorities;
|
||||
private final ConcurrentHashMap<E, Integer> priorityMap = new ConcurrentHashMap<>();
|
||||
|
||||
public DynamicPriorityQueue() {
|
||||
this.taskCount = new AtomicIntegerArray(PriorityHandler.MAX_PRIORITY + 1);
|
||||
this.priorities = new ConcurrentLinkedQueue[PriorityHandler.MAX_PRIORITY + 1];
|
||||
for (int i = 0; i < (PriorityHandler.MAX_PRIORITY + 1); i++) {
|
||||
this.priorities[i] = new ConcurrentLinkedQueue<>();
|
||||
}
|
||||
}
|
||||
|
||||
public void enqueue(E element, int priority) {
|
||||
if (this.priorityMap.putIfAbsent(element, priority) != null)
|
||||
throw new IllegalArgumentException("Element already in queue");
|
||||
|
||||
this.priorities[priority].add(element);
|
||||
this.taskCount.incrementAndGet(priority);
|
||||
}
|
||||
|
||||
public boolean changePriority(E element, int newPriority) {
|
||||
Integer currentPriority = this.priorityMap.get(element);
|
||||
if (currentPriority == null || currentPriority == newPriority) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int currentIndex = currentPriority;
|
||||
boolean removedFromQueue = this.priorities[currentIndex].remove(element);
|
||||
if (!removedFromQueue) {
|
||||
return false;
|
||||
}
|
||||
|
||||
this.taskCount.decrementAndGet(currentIndex);
|
||||
final boolean changeSuccess = this.priorityMap.replace(element, currentPriority, newPriority);
|
||||
if (!changeSuccess) {
|
||||
return false;
|
||||
}
|
||||
|
||||
this.priorities[newPriority].add(element);
|
||||
this.taskCount.incrementAndGet(newPriority);
|
||||
return true;
|
||||
}
|
||||
|
||||
public E dequeue() {
|
||||
for (int i = 0; i < this.priorities.length; i++) {
|
||||
if (this.taskCount.get(i) == 0) continue;
|
||||
E element = priorities[i].poll();
|
||||
if (element != null) {
|
||||
this.taskCount.decrementAndGet(i);
|
||||
this.priorityMap.remove(element);
|
||||
return element;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean contains(E element) {
|
||||
return priorityMap.containsKey(element);
|
||||
}
|
||||
|
||||
public void remove(E element) {
|
||||
Integer priority = this.priorityMap.remove(element);
|
||||
if (priority == null) return;
|
||||
|
||||
boolean removed = this.priorities[priority].remove(element);
|
||||
if (removed) this.taskCount.decrementAndGet(priority);
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return priorityMap.size();
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return size() == 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.ishland.flowsched.util;
|
||||
|
||||
public final class Assertions {
|
||||
public static void assertTrue(boolean value, String message) {
|
||||
if (!value) {
|
||||
final AssertionError error = new AssertionError(message);
|
||||
error.printStackTrace();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertTrue(boolean state, String format, Object... args) {
|
||||
if (!state) {
|
||||
final AssertionError error = new AssertionError(String.format(format, args));
|
||||
error.printStackTrace();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertTrue(boolean value) {
|
||||
if (!value) {
|
||||
final AssertionError error = new AssertionError();
|
||||
error.printStackTrace();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user