9
0
mirror of https://github.com/VolmitSoftware/Iris.git synced 2025-12-28 11:39:07 +00:00

Iris but forkjoin

This commit is contained in:
cyberpwn
2021-08-21 02:42:22 -04:00
parent 72b4c9c6ab
commit d686d07d53
23 changed files with 125 additions and 233 deletions

View File

@@ -46,6 +46,7 @@ import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -386,7 +387,7 @@ public class Mantle {
* @return the future of a tectonic plate.
*/
@RegionCoordinates
private CompletableFuture<TectonicPlate> getSafe(int x, int z) {
private Future<TectonicPlate> getSafe(int x, int z) {
Long k = key(x, z);
TectonicPlate p = loadedRegions.get(k);
@@ -449,7 +450,10 @@ public class Mantle {
public static File fileForRegion(File folder, Long key) {
String id = UUID.nameUUIDFromBytes(("TectonicPlate:" + key).getBytes(StandardCharsets.UTF_8)).toString();
File f = new File(folder, id.substring(0, 2) + "/" + id.split("\\Q-\\E")[3] + "/" + id + ".ttp");
f.getParentFile().mkdirs();
if(!f.getParentFile().exists())
{
f.getParentFile().mkdirs();
}
return f;
}

View File

@@ -28,45 +28,27 @@ import java.util.concurrent.*;
@SuppressWarnings("ALL")
public class BurstExecutor {
private final ExecutorService executor;
private final KList<CompletableFuture<Void>> futures;
@Setter
private boolean multicore = true;
private final KList<Future<?>> futures;
public BurstExecutor(ExecutorService executor, int burstSizeEstimate) {
this.executor = executor;
futures = new KList<CompletableFuture<Void>>(burstSizeEstimate);
futures = new KList<Future<?>>(burstSizeEstimate);
}
@SuppressWarnings("UnusedReturnValue")
public CompletableFuture<Void> queue(Runnable r) {
if(!multicore)
{
r.run();
return null;
}
public Future<?> queue(Runnable r) {
synchronized (futures) {
CompletableFuture<Void> c = CompletableFuture.runAsync(r, executor);
Future<?> c = executor.submit(r);
futures.add(c);
return c;
}
}
public BurstExecutor queue(List<Runnable> r) {
if(!multicore)
{
for(Runnable i : r)
{
i.run();
}
return this;
}
synchronized (futures) {
for (Runnable i : new KList<>(r)) {
CompletableFuture<Void> c = CompletableFuture.runAsync(i, executor);
futures.add(c);
queue(i);
}
}
@@ -74,20 +56,9 @@ public class BurstExecutor {
}
public BurstExecutor queue(Runnable[] r) {
if(!multicore)
{
for(Runnable i : r)
{
i.run();
}
return this;
}
synchronized (futures) {
for (Runnable i : r) {
CompletableFuture<Void> c = CompletableFuture.runAsync(i, executor);
futures.add(c);
queue(i);
}
}
@@ -95,18 +66,17 @@ public class BurstExecutor {
}
public void complete() {
if(!multicore)
{
return;
}
synchronized (futures) {
if (futures.isEmpty()) {
return;
}
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
for(Future<?> i : futures)
{
i.get();
}
futures.clear();
} catch (InterruptedException | ExecutionException e) {
Iris.reportError(e);
@@ -115,11 +85,6 @@ public class BurstExecutor {
}
public boolean complete(long maxDur) {
if(!multicore)
{
return true;
}
synchronized (futures) {
if (futures.isEmpty()) {
return true;

View File

@@ -19,13 +19,10 @@
package com.volmit.iris.util.parallel;
import com.volmit.iris.Iris;
import com.volmit.iris.core.IrisSettings;
import com.volmit.iris.core.service.PreservationSVC;
import com.volmit.iris.util.collection.KList;
import com.volmit.iris.util.io.InstanceState;
import com.volmit.iris.util.math.M;
import com.volmit.iris.util.scheduling.J;
import com.volmit.iris.util.scheduling.Looper;
import org.jetbrains.annotations.NotNull;
import java.util.List;
import java.util.concurrent.*;
@@ -33,66 +30,39 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class MultiBurst {
public static final MultiBurst burst = new MultiBurst("Iris", IrisSettings.get().getConcurrency().getMiscThreadPriority(), IrisSettings.getThreadCount(IrisSettings.get().getConcurrency().getMiscThreadCount()));
public static final MultiBurst burst = new MultiBurst();
private ExecutorService service;
private final Looper heartbeat;
private final AtomicLong last;
private int tid;
private final String name;
private final int tc;
private final int priority;
private final int instance;
public MultiBurst(int tc) {
this("Iris", 6, tc);
public MultiBurst() {
this("Iris", Thread.MIN_PRIORITY);
}
public MultiBurst(String name, int priority, int tc) {
public MultiBurst(String name, int priority) {
this.name = name;
this.priority = priority;
this.tc = tc;
instance = InstanceState.getInstanceId();
last = new AtomicLong(M.ms());
heartbeat = new Looper() {
@Override
protected long loop() {
if (instance != InstanceState.getInstanceId()) {
shutdownNow();
return -1;
}
if (M.ms() - last.get() > TimeUnit.MINUTES.toMillis(1) && service != null) {
service.shutdown();
service = null;
Iris.debug("Shutting down MultiBurst Pool " + getName() + " to conserve resources.");
}
return 30000;
}
};
heartbeat.setName(name + " Monitor");
heartbeat.start();
Iris.service(PreservationSVC.class).register(this);
}
private synchronized ExecutorService getService() {
last.set(M.ms());
if (service == null || service.isShutdown()) {
service = Executors.newFixedThreadPool(Math.max(tc, 1), r -> {
tid++;
Thread t = new Thread(r);
t.setName(name + " " + tid);
t.setPriority(priority);
t.setUncaughtExceptionHandler((et, e) ->
{
Iris.info("Exception encountered in " + et.getName());
e.printStackTrace();
});
service = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
new ForkJoinPool.ForkJoinWorkerThreadFactory() {
int m = 0;
return t;
});
Iris.service(PreservationSVC.class).register(service);
Iris.debug("Started MultiBurst Pool " + name + " with " + tc + " threads at " + priority + " priority.");
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setPriority(priority);
worker.setName(name + " " + ++m);
return worker;
}
},
(t, e) -> e.printStackTrace(), true);
}
return service;
@@ -138,64 +108,15 @@ public class MultiBurst {
return getService().submit(o);
}
public CompletableFuture<?> complete(Runnable o) {
return CompletableFuture.runAsync(o, getService());
public Future<?> complete(Runnable o) {
return getService().submit(o);
}
public <T> CompletableFuture<T> completeValue(Supplier<T> o) {
return CompletableFuture.supplyAsync(o, getService());
public <T> Future<T> completeValue(Callable<T> o) {
return getService().submit(o);
}
public void shutdownNow() {
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
heartbeat.interrupt();
if (service != null) {
service.shutdownNow().forEach(Runnable::run);
}
}
public void shutdown() {
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
heartbeat.interrupt();
if (service != null) {
service.shutdown();
}
}
public void shutdownLater() {
if (service != null) {
try
{
service.submit(() -> {
J.sleep(3000);
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
if (service != null) {
service.shutdown();
}
});
heartbeat.interrupt();
}
catch(Throwable e)
{
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
if (service != null) {
service.shutdown();
}
heartbeat.interrupt();
}
}
}
public void shutdownAndAwait() {
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
heartbeat.interrupt();
public void close() {
if (service != null) {
service.shutdown();
try {