diff --git a/divinemc-server/src/main/java/org/bxteam/divinemc/async/ExecutorShutdown.java b/divinemc-server/src/main/java/org/bxteam/divinemc/async/ExecutorShutdown.java index 33a64e4..5586e16 100644 --- a/divinemc-server/src/main/java/org/bxteam/divinemc/async/ExecutorShutdown.java +++ b/divinemc-server/src/main/java/org/bxteam/divinemc/async/ExecutorShutdown.java @@ -5,6 +5,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bxteam.divinemc.async.pathfinding.AsyncPathProcessor; import org.bxteam.divinemc.async.tracking.MultithreadedTracker; +import org.bxteam.divinemc.config.DivineConfig; +import org.bxteam.divinemc.region.EnumRegionFileExtension; +import org.bxteam.divinemc.region.type.BufferedRegionFile; import java.util.concurrent.TimeUnit; @@ -13,6 +16,14 @@ public class ExecutorShutdown { public static final Logger LOGGER = LogManager.getLogger(ExecutorShutdown.class.getSimpleName()); public static void shutdown(MinecraftServer server) { + if (BufferedRegionFile.flusherInitialized && DivineConfig.MiscCategory.regionFileType == EnumRegionFileExtension.B_LINEAR) { + LOGGER.info("Shutting down buffered region executors..."); + + try { + BufferedRegionFile.shutdown(); + } catch (InterruptedException ignored) { } + } + if (server.mobSpawnExecutor != null && server.mobSpawnExecutor.thread.isAlive()) { LOGGER.info("Shutting down mob spawn executor..."); diff --git a/divinemc-server/src/main/java/org/bxteam/divinemc/region/BufferReleaser.java b/divinemc-server/src/main/java/org/bxteam/divinemc/region/BufferReleaser.java deleted file mode 100644 index 88739d1..0000000 --- a/divinemc-server/src/main/java/org/bxteam/divinemc/region/BufferReleaser.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.bxteam.divinemc.region; - -import org.jetbrains.annotations.NotNull; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; - -public class BufferReleaser { - private static final Method CLEANER_METHOD; - private static final Object UNSAFE; - - static { - try { - Class unsafeClass = Class.forName("sun.misc.Unsafe"); - Field theUnsafe = unsafeClass.getDeclaredField("theUnsafe"); - theUnsafe.setAccessible(true); - UNSAFE = theUnsafe.get(null); - CLEANER_METHOD = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class); - } catch (Exception ex) { - throw new RuntimeException("Unsafe init failed", ex); - } - } - - public static boolean clean(@NotNull ByteBuffer buffer) { - if (!buffer.isDirect()) return false; - try { - CLEANER_METHOD.invoke(UNSAFE, buffer); - return true; - } catch (Exception e) { - return false; - } - } -} diff --git a/divinemc-server/src/main/java/org/bxteam/divinemc/region/type/BufferedRegionFile.java b/divinemc-server/src/main/java/org/bxteam/divinemc/region/type/BufferedRegionFile.java index 45b7b93..b6c094c 100644 --- a/divinemc-server/src/main/java/org/bxteam/divinemc/region/type/BufferedRegionFile.java +++ b/divinemc-server/src/main/java/org/bxteam/divinemc/region/type/BufferedRegionFile.java @@ -1,16 +1,22 @@ package org.bxteam.divinemc.region.type; +import ca.spottedleaf.concurrentutil.util.ConcurrentUtil; import ca.spottedleaf.moonrise.patches.chunk_system.io.MoonriseRegionFileIO; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; import net.minecraft.nbt.CompoundTag; import net.minecraft.world.level.ChunkPos; -import org.bxteam.divinemc.region.BufferReleaser; +import org.bxteam.divinemc.region.EnumRegionFileExtension; import org.bxteam.divinemc.region.IRegionFile; +import org.bxteam.divinemc.config.DivineConfig; +import org.bxteam.divinemc.util.NamedAgnosticThreadFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet; import java.io.*; +import java.lang.invoke.VarHandle; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -19,20 +25,15 @@ import java.nio.file.StandardOpenOption; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; +import java.util.Arrays; +import java.util.List; +import java.util.Set; /** * A buffered region file implementation that provides efficient chunk storage and retrieval * with compression, checksums, and automatic compaction capabilities. * - *

This implementation includes: - *

- * *

For conversion tools between MCA and buffered region file formats, see: * LinearRegionFileFormatTools */ @@ -52,10 +53,24 @@ public class BufferedRegionFile implements IRegionFile { private byte compressionLevel = 6; private int xxHash32Seed = HASH_SEED; private FileChannel channel; + private boolean closed = false; + + private volatile boolean synced = true; + private volatile boolean beingSynced = false; + private volatile long lastWritten = 0L; + + private static final Set MANAGED_FILES = new ObjectLinkedOpenHashSet<>(); + private static volatile ScheduledFuture flusherChecker; + private static volatile Executor ioWorkerPool; + private static final Object FLUSHER_LOCK = new Object(); + public static volatile boolean flusherInitialized = false; + + private static final VarHandle SYNCED_HANDLE = ConcurrentUtil.getVarHandle(BufferedRegionFile.class, "synced", boolean.class); + private static final VarHandle BEING_SYNCED_HANDLE = ConcurrentUtil.getVarHandle(BufferedRegionFile.class, "beingSynced", boolean.class); + private static final VarHandle LAST_WRITTEN_HANDLE = ConcurrentUtil.getVarHandle(BufferedRegionFile.class, "lastWritten", long.class); public BufferedRegionFile(Path filePath, int compressionLevel) throws IOException { this(filePath); - this.compressionLevel = (byte) compressionLevel; } @@ -73,6 +88,179 @@ public class BufferedRegionFile implements IRegionFile { } this.readHeaders(); + + if (DivineConfig.MiscCategory.regionFileType == EnumRegionFileExtension.B_LINEAR) initializeFlusherIfNeeded(); + addToFlusherManagement(); + } + + private static void initializeFlusherIfNeeded() { + if (flusherInitialized) return; + + synchronized (FLUSHER_LOCK) { + if (flusherInitialized) { + return; + } + + final int nIoThreads = DivineConfig.MiscCategory.linearIoThreadCount; + final long checkIntervalMs = 20; + + ioWorkerPool = Executors.newFixedThreadPool(nIoThreads, + new NamedAgnosticThreadFactory<>( + "BufferedRegionFile I/O Worker", + (group, runnable, name) -> { + Thread thread = new Thread(group, runnable, name); + thread.setDaemon(true); + return thread; + }, + Thread.NORM_PRIORITY + ) + ); + + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + new NamedAgnosticThreadFactory<>( + "BufferedRegionFile Flusher Checker", + (group, runnable, name) -> { + Thread thread = new Thread(group, runnable, name); + thread.setDaemon(true); + return thread; + }, + Thread.NORM_PRIORITY + ) + ); + + flusherChecker = scheduler.scheduleWithFixedDelay( + BufferedRegionFile::runFlusherCheck, + checkIntervalMs, + checkIntervalMs, + TimeUnit.MILLISECONDS + ); + + flusherInitialized = true; + } + } + + private static void runFlusherCheck() { + final long currentNanos = System.nanoTime(); + final BufferedRegionFile[] copied; + + synchronized (MANAGED_FILES) { + copied = Arrays.copyOf( + MANAGED_FILES.toArray(new BufferedRegionFile[0]), + MANAGED_FILES.size(), + BufferedRegionFile[].class + ); + } + + final List toRemove = new ObjectArrayList<>(); + for (BufferedRegionFile file : copied) { + if (!file.softReadLock()) { + continue; + } + + boolean closed; + + try { + closed = file.isClosedRaw(); + } finally { + file.releaseReadLock(); + } + + if (closed) { + toRemove.add(file); + continue; + } + + if (!file.shouldSync()) { + continue; + } + + final long lastWriteNanos = file.getLastWritten(); + final long timeElapsed = (currentNanos - lastWriteNanos) / 1_000_000; + final long flushTimeoutMs = DivineConfig.MiscCategory.linearIoFlushDelayMs; + + if (timeElapsed >= flushTimeoutMs) { + if (!file.markAsBeingSynced()) { + continue; + } + + ioWorkerPool.execute(() -> { + try { + file.flush(); + file.syncIfNeeded(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + + synchronized (MANAGED_FILES) { + for (BufferedRegionFile file : toRemove) { + MANAGED_FILES.remove(file); + } + } + } + + public static void shutdown() throws InterruptedException { + synchronized (FLUSHER_LOCK) { + if (!flusherInitialized) { + return; + } + + if (flusherChecker != null) { + flusherChecker.cancel(false); + } + + if (ioWorkerPool instanceof ExecutorService) { + ((ExecutorService) ioWorkerPool).shutdown(); + //noinspection StatementWithEmptyBody + while (!((ExecutorService) ioWorkerPool).awaitTermination(100, TimeUnit.MILLISECONDS)); + } + + flusherInitialized = false; + } + } + + private void addToFlusherManagement() { + synchronized (MANAGED_FILES) { + MANAGED_FILES.add(this); + } + } + + private void removeFromFlusherManagement() { + synchronized (MANAGED_FILES) { + MANAGED_FILES.remove(this); + } + } + + public boolean softReadLock() { + return this.fileAccessLock.readLock().tryLock(); + } + + public void releaseReadLock() { + this.fileAccessLock.readLock().unlock(); + } + + public boolean isClosedRaw() { + return this.closed; + } + + public boolean shouldSync() { + return !(boolean) SYNCED_HANDLE.get(this); + } + + public long getLastWritten() { + return (long) LAST_WRITTEN_HANDLE.get(this); + } + + public boolean markAsBeingSynced() { + return BEING_SYNCED_HANDLE.compareAndSet(this, false, true); + } + + public void syncIfNeeded() throws IOException { + if (this.channel != null && this.channel.isOpen()) { + this.channel.force(true); + } } private void readHeaders() throws IOException { @@ -98,8 +286,6 @@ public class BufferedRegionFile implements IRegionFile { this.currentAcquiredIndex = Math.max(this.currentAcquiredIndex, sector.offset + sector.length); } } - - BufferReleaser.clean(buffer); } private void writeHeaders() throws IOException { @@ -121,8 +307,6 @@ public class BufferedRegionFile implements IRegionFile { while (buffer.hasRemaining()) { offset += this.channel.write(buffer, offset); } - - BufferReleaser.clean(buffer); } private int sectorSize() { @@ -143,6 +327,10 @@ public class BufferedRegionFile implements IRegionFile { } private void flushInternal() throws IOException { + if (this.closed) { + return; + } + this.writeHeaders(); long spareSize = this.channel.size(); @@ -163,6 +351,7 @@ public class BufferedRegionFile implements IRegionFile { } private void closeInternal() throws IOException { + this.closed = true; this.writeHeaders(); this.channel.force(true); this.compact(); @@ -186,35 +375,36 @@ public class BufferedRegionFile implements IRegionFile { while (headerBuffer.hasRemaining()) { offsetHeader += tempChannel.write(headerBuffer, offsetHeader); } - BufferReleaser.clean(headerBuffer); - int offsetPointer = this.headerSize(); + long offsetPointer = this.headerSize(); + tempChannel.position(offsetPointer); + for (Sector sector : this.sectors) { if (!sector.hasData()) { continue; } - final ByteBuffer sectorData = sector.read(this.channel); - final int length = sectorData.remaining(); - - final Sector newRecalculated = new Sector(sector.index, offsetPointer, length); - offsetPointer += length; - this.sectors[sector.index] = newRecalculated; - - newRecalculated.hasData = true; - - long offset = newRecalculated.offset; - while (sectorData.hasRemaining()) { - offset += tempChannel.write(sectorData, offset); + long transferred = 0; + while (transferred < sector.length) { + transferred += this.channel.transferTo( + sector.offset + transferred, + sector.length - transferred, + tempChannel); } - BufferReleaser.clean(sectorData); + final Sector newRecalculated = new Sector(sector.index, offsetPointer, sector.length); + newRecalculated.hasData = true; + + offsetPointer += sector.length; + this.sectors[sector.index] = newRecalculated; } tempChannel.force(true); - this.currentAcquiredIndex = tempChannel.size(); + this.currentAcquiredIndex = offsetPointer; } + this.channel.close(); + Files.move( new File(this.filePath.toString() + ".tmp").toPath(), this.filePath, @@ -242,6 +432,9 @@ public class BufferedRegionFile implements IRegionFile { final Sector sector = this.sectors[chunkOrdinal]; sector.store(chunkData, this.channel); + + SYNCED_HANDLE.set(this, false); + LAST_WRITTEN_HANDLE.set(this, System.nanoTime()); } private @Nullable ByteBuffer readChunkDataRaw(int chunkOrdinal) throws IOException { @@ -260,6 +453,9 @@ public class BufferedRegionFile implements IRegionFile { sector.clear(); this.writeHeaders(); + + SYNCED_HANDLE.set(this, false); + LAST_WRITTEN_HANDLE.set(this, System.nanoTime()); } private static int getChunkIndex(int x, int z) { @@ -281,13 +477,12 @@ public class BufferedRegionFile implements IRegionFile { final ByteBuffer chunkSectionBuilder = ByteBuffer.allocateDirect(compressedData.remaining() + 4 + 8 + 4); chunkSectionBuilder.putInt(data.remaining()); // Uncompressed length - chunkSectionBuilder.putLong(System.nanoTime()); // Timestamp + chunkSectionBuilder.putLong(System.currentTimeMillis()); // Timestamp chunkSectionBuilder.putInt(xxHash32OfData); // xxHash32 of the original data chunkSectionBuilder.put(compressedData); // Compressed data chunkSectionBuilder.flip(); this.writeChunkDataRaw(chunkIndex, chunkSectionBuilder); - BufferReleaser.clean(chunkSectionBuilder); } private @Nullable ByteBuffer readChunk(int x, int z) throws IOException { @@ -297,17 +492,15 @@ public class BufferedRegionFile implements IRegionFile { return null; } - final int uncompressedLength = compressed.getInt(); // compressed length + final int uncompressedLength = compressed.getInt(); final long timestamp = compressed.getLong(); // TODO use this timestamp for something? - final int dataXXHash32 = compressed.getInt(); // XXHash32 for validation + final int dataXXHash32 = compressed.getInt(); final ByteBuffer decompressed = this.decompress(this.ensureDirectBuffer(compressed), uncompressedLength); - BufferReleaser.clean(compressed); - final IOException xxHash32CheckFailedEx = this.checkXXHash32(dataXXHash32, decompressed); if (xxHash32CheckFailedEx != null) { - throw xxHash32CheckFailedEx; // prevent from loading + throw xxHash32CheckFailedEx; } return decompressed; @@ -437,10 +630,8 @@ public class BufferedRegionFile implements IRegionFile { final byte[] dataBytes = new byte[data.remaining()]; data.get(dataBytes); - BufferReleaser.clean(data); - return new DataInputStream(new ByteArrayInputStream(dataBytes)); - }finally { + } finally { this.fileAccessLock.readLock().unlock(); } } @@ -450,7 +641,7 @@ public class BufferedRegionFile implements IRegionFile { this.fileAccessLock.readLock().lock(); try { return this.hasData(getChunkIndex(pos.x, pos.z)); - }finally { + } finally { this.fileAccessLock.readLock().unlock(); } } @@ -465,7 +656,7 @@ public class BufferedRegionFile implements IRegionFile { this.fileAccessLock.writeLock().lock(); try { this.clearChunkData(getChunkIndex(pos.x, pos.z)); - }finally { + } finally { this.fileAccessLock.writeLock().unlock(); } } @@ -515,7 +706,6 @@ public class BufferedRegionFile implements IRegionFile { public int getRecalculateCount() { return this.recalculateCount.get(); } - // MCC end @Override public MoonriseRegionFileIO.RegionDataController.WriteData moonrise$startWrite(CompoundTag data, ChunkPos pos) { @@ -531,8 +721,21 @@ public class BufferedRegionFile implements IRegionFile { public void flush() throws IOException { this.fileAccessLock.writeLock().lock(); try { - this.flushInternal(); - }finally { + if ((boolean) SYNCED_HANDLE.get(this)) { + return; + } + + if (!BEING_SYNCED_HANDLE.compareAndSet(this, false, true)) { + return; + } + + try { + this.flushInternal(); + SYNCED_HANDLE.set(this, true); + } finally { + BEING_SYNCED_HANDLE.set(this, false); + } + } finally { this.fileAccessLock.writeLock().unlock(); } } @@ -541,8 +744,9 @@ public class BufferedRegionFile implements IRegionFile { public void close() throws IOException { this.fileAccessLock.writeLock().lock(); try { + removeFromFlusherManagement(); this.closeInternal(); - }finally { + } finally { this.fileAccessLock.writeLock().unlock(); } } @@ -577,7 +781,7 @@ public class BufferedRegionFile implements IRegionFile { long offset = this.offset; while (newData.hasRemaining()) { - offset = channel.write(newData, offset); + offset += channel.write(newData, offset); } } @@ -611,7 +815,6 @@ public class BufferedRegionFile implements IRegionFile { } static int sizeOfSingle() { - // offset length hasData return Long.BYTES * 2 + 1; } } @@ -630,7 +833,7 @@ public class BufferedRegionFile implements IRegionFile { ByteBuffer bytebuffer = ByteBuffer.wrap(this.buf, 0, this.count); BufferedRegionFile.this.writeChunk(this.pos.x, this.pos.z, bytebuffer); - }finally { + } finally { BufferedRegionFile.this.fileAccessLock.writeLock().unlock(); } }