diff --git a/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java b/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java index e96eb29ed..3fc3df546 100644 --- a/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java +++ b/core/src/main/java/com/volmit/iris/core/commands/CommandDeveloper.java @@ -115,27 +115,6 @@ public class CommandDeveloper implements DecreeExecutor { } } - @Decree(description = "Test") - public void benchmarkMantle( - @Param(description = "The world to bench", aliases = {"world"}) - World world - ) throws IOException, ClassNotFoundException { - Engine engine = IrisToolbelt.access(world).getEngine(); - int maxHeight = engine.getTarget().getHeight(); - File folder = new File(Bukkit.getWorldContainer(), world.getName()); - int c = 0; - //MCAUtil.read() - - File tectonicplates = new File(folder, "mantle"); - for (File i : Objects.requireNonNull(tectonicplates.listFiles())) { - TectonicPlate.read(maxHeight, i, true); - c++; - Iris.info("Loaded count: " + c ); - - } - - } - @Decree(description = "Test") public void packBenchmark( @Param(description = "The pack to bench", aliases = {"pack"}, defaultValue = "overworld") diff --git a/core/src/main/java/com/volmit/iris/util/io/IO.java b/core/src/main/java/com/volmit/iris/util/io/IO.java index 247dfefb1..a848d13a0 100644 --- a/core/src/main/java/com/volmit/iris/util/io/IO.java +++ b/core/src/main/java/com/volmit/iris/util/io/IO.java @@ -24,14 +24,20 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.volmit.iris.Iris; import com.volmit.iris.util.format.Form; +import com.volmit.iris.util.scheduling.J; import org.apache.commons.io.function.IOConsumer; import org.apache.commons.io.function.IOFunction; import java.io.*; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -1668,13 +1674,24 @@ public class IO { dir.mkdirs(); dir.deleteOnExit(); File temp = File.createTempFile("iris",".bin", dir); - try { + try (var target = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.SYNC)) { + lock(target); + try (var out = builder.apply(new FileOutputStream(temp))) { action.accept(out); } - Files.move(temp.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING); + Files.copy(temp.toPath(), Channels.newOutputStream(target)); } finally { temp.delete(); } } + + public static FileLock lock(FileChannel channel) throws IOException { + while (true) { + try { + return channel.lock(); + } catch (OverlappingFileLockException e) {} + J.sleep(1); + } + } } diff --git a/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java b/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java index c48c43bd8..bcb211937 100644 --- a/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java +++ b/core/src/main/java/com/volmit/iris/util/mantle/Mantle.java @@ -35,6 +35,7 @@ import com.volmit.iris.util.format.C; import com.volmit.iris.util.format.Form; import com.volmit.iris.util.function.Consumer4; import com.volmit.iris.util.io.IO; +import com.volmit.iris.util.mantle.io.IOWorker; import com.volmit.iris.util.math.M; import com.volmit.iris.util.matter.Matter; import com.volmit.iris.util.matter.MatterSlice; @@ -44,9 +45,7 @@ import com.volmit.iris.util.parallel.MultiBurst; import lombok.Getter; import org.bukkit.Chunk; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -69,6 +68,7 @@ public class Mantle { private final MultiBurst ioBurst; private final Semaphore ioTrim; private final Semaphore ioTectonicUnload; + private final IOWorker worker; private final AtomicDouble adjustedIdleDuration; private final KSet toUnload; @@ -91,6 +91,7 @@ public class Mantle { ioBurst = MultiBurst.burst; adjustedIdleDuration = new AtomicDouble(0); toUnload = new KSet<>(); + worker = new IOWorker(dataFolder, worldHeight); Iris.debug("Opened The Mantle " + C.DARK_AQUA + dataFolder.getAbsolutePath()); } @@ -379,7 +380,7 @@ public class Mantle { loadedRegions.forEach((i, plate) -> b.queue(() -> { try { plate.close(); - plate.write(fileForRegion(dataFolder, i, false)); + worker.write(fileForRegion(dataFolder, i, false).getName(), plate); oldFileForRegion(dataFolder, i).delete(); } catch (Throwable e) { Iris.error("Failed to write Tectonic Plate " + C.DARK_GREEN + Cache.keyX(i) + " " + Cache.keyZ(i)); @@ -394,6 +395,11 @@ public class Mantle { } catch (Throwable e) { Iris.reportError(e); } + try { + worker.close(); + } catch (Throwable e) { + Iris.reportError(e); + } IO.delete(new File(dataFolder, ".tmp")); Iris.debug("The Mantle has Closed " + C.DARK_AQUA + dataFolder.getAbsolutePath()); @@ -484,7 +490,7 @@ public class Mantle { } try { - m.write(fileForRegion(dataFolder, id, false)); + worker.write(fileForRegion(dataFolder, id, false).getName(), m); oldFileForRegion(dataFolder, id).delete(); loadedRegions.remove(id, m); lastUse.remove(id); @@ -580,7 +586,7 @@ public class Mantle { if (file.exists()) { try { Iris.addPanic("reading.tectonic-plate", file.getAbsolutePath()); - region = TectonicPlate.read(worldHeight, file, file.getName().startsWith("pv.")); + region = worker.read(file.getName()); if (region.getX() != x || region.getZ() != z) { Iris.warn("Loaded Tectonic Plate " + x + "," + z + " but read it as " + region.getX() + "," + region.getZ() + "... Assuming " + x + "," + z); diff --git a/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java b/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java index 181ce9cd2..a10bfbc26 100644 --- a/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java +++ b/core/src/main/java/com/volmit/iris/util/mantle/TectonicPlate.java @@ -19,26 +19,14 @@ package com.volmit.iris.util.mantle; import com.volmit.iris.Iris; -import com.volmit.iris.core.IrisSettings; import com.volmit.iris.engine.EnginePanic; import com.volmit.iris.engine.data.cache.Cache; import com.volmit.iris.util.data.Varint; import com.volmit.iris.util.documentation.ChunkCoordinates; -import com.volmit.iris.util.format.C; -import com.volmit.iris.util.format.Form; import com.volmit.iris.util.io.CountingDataInputStream; -import com.volmit.iris.util.io.IO; -import com.volmit.iris.util.scheduling.PrecisionStopwatch; import lombok.Getter; -import net.jpountz.lz4.LZ4BlockInputStream; -import net.jpountz.lz4.LZ4BlockOutputStream; import java.io.*; -import java.nio.channels.Channels; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -111,31 +99,6 @@ public class TectonicPlate { } } - public static TectonicPlate read(int worldHeight, File file, boolean versioned) throws IOException { - try (FileChannel fc = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.SYNC)) { - fc.lock(); - - InputStream fin = Channels.newInputStream(fc); - LZ4BlockInputStream lz4 = new LZ4BlockInputStream(fin); - BufferedInputStream bis = new BufferedInputStream(lz4); - try (CountingDataInputStream din = CountingDataInputStream.wrap(bis)) { - return new TectonicPlate(worldHeight, din, versioned); - } - } finally { - if (IrisSettings.get().getGeneral().isDumpMantleOnError() && errors.get()) { - File dump = Iris.instance.getDataFolder("dump", file.getName() + ".bin"); - try (FileChannel fc = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.SYNC)) { - fc.lock(); - - InputStream fin = Channels.newInputStream(fc); - LZ4BlockInputStream lz4 = new LZ4BlockInputStream(fin); - Files.copy(lz4, dump.toPath(), StandardCopyOption.REPLACE_EXISTING); - } - } - errors.remove(); - } - } - public boolean inUse() { for (int i = 0; i < chunks.length(); i++) { MantleChunk chunk = chunks.get(i); @@ -223,18 +186,6 @@ public class TectonicPlate { return Cache.to1D(x, z, 0, 32, 32); } - /** - * Write this tectonic plate to file - * - * @param file the file to writeNodeData it to - * @throws IOException shit happens - */ - public void write(File file) throws IOException { - PrecisionStopwatch p = PrecisionStopwatch.start(); - IO.write(file, out -> new DataOutputStream(new LZ4BlockOutputStream(out)), this::write); - Iris.debug("Saved Tectonic Plate " + C.DARK_GREEN + file.getName() + C.RED + " in " + Form.duration(p.getMilliseconds(), 2)); - } - /** * Write this tectonic plate to a data stream * @@ -268,4 +219,12 @@ public class TectonicPlate { public static void addError() { errors.set(true); } + + public static boolean hasError() { + try { + return errors.get(); + } finally { + errors.remove(); + } + } } diff --git a/core/src/main/java/com/volmit/iris/util/mantle/io/DelegateStream.java b/core/src/main/java/com/volmit/iris/util/mantle/io/DelegateStream.java new file mode 100644 index 000000000..2ccbbbe14 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/util/mantle/io/DelegateStream.java @@ -0,0 +1,123 @@ +/* + * Iris is a World Generator for Minecraft Bukkit Servers + * Copyright (c) 2022 Arcane Arts (Volmit Software) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.volmit.iris.util.mantle.io; + +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; + +public class DelegateStream { + + public static InputStream read(FileChannel channel) throws IOException { + channel.position(0); + return new Input(channel); + } + + public static OutputStream write(FileChannel channel) throws IOException { + channel.position(0); + return new Output(channel); + } + + private static class Input extends InputStream { + private final InputStream delegate; + + private Input(FileChannel channel) { + this.delegate = Channels.newInputStream(channel); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public int read(byte @NotNull [] b, int off, int len) throws IOException { + return delegate.read(b, off, len); + } + + @Override + public byte @NotNull [] readAllBytes() throws IOException { + return delegate.readAllBytes(); + } + + @Override + public int readNBytes(byte[] b, int off, int len) throws IOException { + return delegate.readNBytes(b, off, len); + } + + @Override + public byte @NotNull [] readNBytes(int len) throws IOException { + return delegate.readNBytes(len); + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public void skipNBytes(long n) throws IOException { + delegate.skipNBytes(n); + } + + @Override + public long transferTo(OutputStream out) throws IOException { + return delegate.transferTo(out); + } + } + + private static class Output extends OutputStream { + private final FileChannel channel; + private final OutputStream delegate; + + private Output(FileChannel channel) { + this.channel = channel; + this.delegate = Channels.newOutputStream(channel); + } + + @Override + public void write(int b) throws IOException { + delegate.write(b); + } + + @Override + public void write(byte @NotNull [] b, int off, int len) throws IOException { + delegate.write(b, off, len); + } + + @Override + public void flush() throws IOException { + channel.truncate(channel.position()); + } + + @Override + public void close() throws IOException { + channel.force(true); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/util/mantle/io/Holder.java b/core/src/main/java/com/volmit/iris/util/mantle/io/Holder.java new file mode 100644 index 000000000..fa36ce9c5 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/util/mantle/io/Holder.java @@ -0,0 +1,57 @@ +/* + * Iris is a World Generator for Minecraft Bukkit Servers + * Copyright (c) 2022 Arcane Arts (Volmit Software) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.volmit.iris.util.mantle.io; + +import com.volmit.iris.util.io.IO; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.concurrent.Semaphore; + +class Holder { + private final FileChannel channel; + private final Semaphore semaphore = new Semaphore(1); + private volatile boolean closed; + + Holder(FileChannel channel) throws IOException { + this.channel = channel; + IO.lock(channel); + } + + SynchronizedChannel acquire() { + semaphore.acquireUninterruptibly(); + if (closed) { + semaphore.release(); + return null; + } + + return new SynchronizedChannel(channel, semaphore); + } + + void close() throws IOException { + semaphore.acquireUninterruptibly(); + try { + if (closed) return; + closed = true; + channel.close(); + } finally { + semaphore.release(); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/util/mantle/io/IOWorker.java b/core/src/main/java/com/volmit/iris/util/mantle/io/IOWorker.java new file mode 100644 index 000000000..d2edb4995 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/util/mantle/io/IOWorker.java @@ -0,0 +1,130 @@ +/* + * Iris is a World Generator for Minecraft Bukkit Servers + * Copyright (c) 2022 Arcane Arts (Volmit Software) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.volmit.iris.util.mantle.io; + +import com.volmit.iris.Iris; +import com.volmit.iris.core.IrisSettings; +import com.volmit.iris.util.format.C; +import com.volmit.iris.util.format.Form; +import com.volmit.iris.util.io.CountingDataInputStream; +import com.volmit.iris.util.mantle.TectonicPlate; +import com.volmit.iris.util.scheduling.PrecisionStopwatch; +import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; + +import java.io.*; +import java.nio.channels.FileChannel; +import java.nio.file.*; +import java.util.Objects; +import java.util.Set; + +public class IOWorker { + private static final Set OPTIONS = Set.of(StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.SYNC); + private static final int MAX_CACHE_SIZE = 128; + + private final Path root; + private final File tmp; + private final int worldHeight; + + private final Object2ObjectLinkedOpenHashMap cache = new Object2ObjectLinkedOpenHashMap<>(); + + public IOWorker(File root, int worldHeight) { + this.root = root.toPath(); + this.tmp = new File(root, ".tmp"); + this.worldHeight = worldHeight; + } + + public TectonicPlate read(final String name) throws IOException { + PrecisionStopwatch p = PrecisionStopwatch.start(); + try (var channel = getChannel(name)) { + var raw = channel.read(); + var lz4 = new LZ4BlockInputStream(raw); + var buffered = new BufferedInputStream(lz4); + try (var in = CountingDataInputStream.wrap(buffered)) { + return new TectonicPlate(worldHeight, in, name.startsWith("pv.")); + } finally { + if (TectonicPlate.hasError() && IrisSettings.get().getGeneral().isDumpMantleOnError()) { + File dump = Iris.instance.getDataFolder("dump", name + ".bin"); + Files.copy(new LZ4BlockInputStream(channel.read()), dump.toPath(), StandardCopyOption.REPLACE_EXISTING); + } else { + Iris.debug("Read Tectonic Plate " + C.DARK_GREEN + name + C.RED + " in " + Form.duration(p.getMilliseconds(), 2)); + } + } + } + } + + public void write(final String name, final TectonicPlate plate) throws IOException { + PrecisionStopwatch p = PrecisionStopwatch.start(); + try (var channel = getChannel(name)) { + tmp.mkdirs(); + File file = File.createTempFile("iris", ".bin", tmp); + try { + try (var tmp = new DataOutputStream(new LZ4BlockOutputStream(new FileOutputStream(file)))) { + plate.write(tmp); + } + + try (var out = channel.write()) { + Files.copy(file.toPath(), out); + out.flush(); + } + } finally { + file.delete(); + } + } + Iris.debug("Saved Tectonic Plate " + C.DARK_GREEN + name + C.RED + " in " + Form.duration(p.getMilliseconds(), 2)); + } + + public void close() throws IOException { + synchronized (cache) { + for (Holder h : cache.values()) { + h.close(); + } + + cache.clear(); + } + } + + private SynchronizedChannel getChannel(final String name) throws IOException { + PrecisionStopwatch p = PrecisionStopwatch.start(); + try { + synchronized (cache) { + Holder holder = cache.getAndMoveToFirst(name); + if (holder != null) { + var channel = holder.acquire(); + if (channel != null) { + return channel; + } + } + + if (cache.size() >= MAX_CACHE_SIZE) { + var last = cache.removeLast(); + last.close(); + } + + + holder = new Holder(FileChannel.open(root.resolve(name), OPTIONS)); + cache.putAndMoveToFirst(name, holder); + return Objects.requireNonNull(holder.acquire()); + } + } finally { + Iris.debug("Acquired Channel for " + C.DARK_GREEN + name + C.RED + " in " + Form.duration(p.getMilliseconds(), 2)); + } + } +} diff --git a/core/src/main/java/com/volmit/iris/util/mantle/io/SynchronizedChannel.java b/core/src/main/java/com/volmit/iris/util/mantle/io/SynchronizedChannel.java new file mode 100644 index 000000000..a26d02967 --- /dev/null +++ b/core/src/main/java/com/volmit/iris/util/mantle/io/SynchronizedChannel.java @@ -0,0 +1,54 @@ +/* + * Iris is a World Generator for Minecraft Bukkit Servers + * Copyright (c) 2022 Arcane Arts (Volmit Software) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.volmit.iris.util.mantle.io; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.FileChannel; +import java.util.concurrent.Semaphore; + +public class SynchronizedChannel implements Closeable { + private final FileChannel channel; + private final Semaphore lock; + private transient boolean closed; + + SynchronizedChannel(FileChannel channel, Semaphore lock) { + this.channel = channel; + this.lock = lock; + } + + public InputStream read() throws IOException { + if (closed) throw new IOException("Channel is closed!"); + return DelegateStream.read(channel); + } + + public OutputStream write() throws IOException { + if (closed) throw new IOException("Channel is closed!"); + return DelegateStream.write(channel); + } + + @Override + public void close() throws IOException { + if (closed) return; + closed = true; + lock.release(); + } +}