mirror of
https://github.com/BX-Team/DivineMC.git
synced 2025-12-19 14:59:25 +00:00
improve buffered
This commit is contained in:
@@ -5,6 +5,9 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.bxteam.divinemc.async.pathfinding.AsyncPathProcessor;
|
||||
import org.bxteam.divinemc.async.tracking.MultithreadedTracker;
|
||||
import org.bxteam.divinemc.config.DivineConfig;
|
||||
import org.bxteam.divinemc.region.EnumRegionFileExtension;
|
||||
import org.bxteam.divinemc.region.type.BufferedRegionFile;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -13,6 +16,14 @@ public class ExecutorShutdown {
|
||||
public static final Logger LOGGER = LogManager.getLogger(ExecutorShutdown.class.getSimpleName());
|
||||
|
||||
public static void shutdown(MinecraftServer server) {
|
||||
if (BufferedRegionFile.flusherInitialized && DivineConfig.MiscCategory.regionFileType == EnumRegionFileExtension.B_LINEAR) {
|
||||
LOGGER.info("Shutting down buffered region executors...");
|
||||
|
||||
try {
|
||||
BufferedRegionFile.shutdown();
|
||||
} catch (InterruptedException ignored) { }
|
||||
}
|
||||
|
||||
if (server.mobSpawnExecutor != null && server.mobSpawnExecutor.thread.isAlive()) {
|
||||
LOGGER.info("Shutting down mob spawn executor...");
|
||||
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
package org.bxteam.divinemc.region;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class BufferReleaser {
|
||||
private static final Method CLEANER_METHOD;
|
||||
private static final Object UNSAFE;
|
||||
|
||||
static {
|
||||
try {
|
||||
Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
|
||||
Field theUnsafe = unsafeClass.getDeclaredField("theUnsafe");
|
||||
theUnsafe.setAccessible(true);
|
||||
UNSAFE = theUnsafe.get(null);
|
||||
CLEANER_METHOD = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class);
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException("Unsafe init failed", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean clean(@NotNull ByteBuffer buffer) {
|
||||
if (!buffer.isDirect()) return false;
|
||||
try {
|
||||
CLEANER_METHOD.invoke(UNSAFE, buffer);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,16 +1,22 @@
|
||||
package org.bxteam.divinemc.region.type;
|
||||
|
||||
import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
|
||||
import ca.spottedleaf.moonrise.patches.chunk_system.io.MoonriseRegionFileIO;
|
||||
import net.jpountz.xxhash.XXHash32;
|
||||
import net.jpountz.xxhash.XXHashFactory;
|
||||
import net.minecraft.nbt.CompoundTag;
|
||||
import net.minecraft.world.level.ChunkPos;
|
||||
import org.bxteam.divinemc.region.BufferReleaser;
|
||||
import org.bxteam.divinemc.region.EnumRegionFileExtension;
|
||||
import org.bxteam.divinemc.region.IRegionFile;
|
||||
import org.bxteam.divinemc.config.DivineConfig;
|
||||
import org.bxteam.divinemc.util.NamedAgnosticThreadFactory;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
|
||||
|
||||
import java.io.*;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
@@ -19,20 +25,15 @@ import java.nio.file.StandardOpenOption;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A buffered region file implementation that provides efficient chunk storage and retrieval
|
||||
* with compression, checksums, and automatic compaction capabilities.
|
||||
*
|
||||
* <p>This implementation includes:
|
||||
* <ul>
|
||||
* <li>Zstandard compression for chunk data</li>
|
||||
* <li>XXHash32 checksums for data integrity verification</li>
|
||||
* <li>Automatic file compaction when fragmentation exceeds thresholds</li>
|
||||
* <li>Thread-safe operations with read-write locks</li>
|
||||
* <li>Direct ByteBuffer usage for memory efficiency</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>For conversion tools between MCA and buffered region file formats, see:
|
||||
* <a href="https://github.com/NONPLAYT/LinearRegionFileFormatTools">LinearRegionFileFormatTools</a>
|
||||
*/
|
||||
@@ -52,10 +53,24 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
private byte compressionLevel = 6;
|
||||
private int xxHash32Seed = HASH_SEED;
|
||||
private FileChannel channel;
|
||||
private boolean closed = false;
|
||||
|
||||
private volatile boolean synced = true;
|
||||
private volatile boolean beingSynced = false;
|
||||
private volatile long lastWritten = 0L;
|
||||
|
||||
private static final Set<BufferedRegionFile> MANAGED_FILES = new ObjectLinkedOpenHashSet<>();
|
||||
private static volatile ScheduledFuture<?> flusherChecker;
|
||||
private static volatile Executor ioWorkerPool;
|
||||
private static final Object FLUSHER_LOCK = new Object();
|
||||
public static volatile boolean flusherInitialized = false;
|
||||
|
||||
private static final VarHandle SYNCED_HANDLE = ConcurrentUtil.getVarHandle(BufferedRegionFile.class, "synced", boolean.class);
|
||||
private static final VarHandle BEING_SYNCED_HANDLE = ConcurrentUtil.getVarHandle(BufferedRegionFile.class, "beingSynced", boolean.class);
|
||||
private static final VarHandle LAST_WRITTEN_HANDLE = ConcurrentUtil.getVarHandle(BufferedRegionFile.class, "lastWritten", long.class);
|
||||
|
||||
public BufferedRegionFile(Path filePath, int compressionLevel) throws IOException {
|
||||
this(filePath);
|
||||
|
||||
this.compressionLevel = (byte) compressionLevel;
|
||||
}
|
||||
|
||||
@@ -73,6 +88,179 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
}
|
||||
|
||||
this.readHeaders();
|
||||
|
||||
if (DivineConfig.MiscCategory.regionFileType == EnumRegionFileExtension.B_LINEAR) initializeFlusherIfNeeded();
|
||||
addToFlusherManagement();
|
||||
}
|
||||
|
||||
private static void initializeFlusherIfNeeded() {
|
||||
if (flusherInitialized) return;
|
||||
|
||||
synchronized (FLUSHER_LOCK) {
|
||||
if (flusherInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
final int nIoThreads = DivineConfig.MiscCategory.linearIoThreadCount;
|
||||
final long checkIntervalMs = 20;
|
||||
|
||||
ioWorkerPool = Executors.newFixedThreadPool(nIoThreads,
|
||||
new NamedAgnosticThreadFactory<>(
|
||||
"BufferedRegionFile I/O Worker",
|
||||
(group, runnable, name) -> {
|
||||
Thread thread = new Thread(group, runnable, name);
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
},
|
||||
Thread.NORM_PRIORITY
|
||||
)
|
||||
);
|
||||
|
||||
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
|
||||
new NamedAgnosticThreadFactory<>(
|
||||
"BufferedRegionFile Flusher Checker",
|
||||
(group, runnable, name) -> {
|
||||
Thread thread = new Thread(group, runnable, name);
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
},
|
||||
Thread.NORM_PRIORITY
|
||||
)
|
||||
);
|
||||
|
||||
flusherChecker = scheduler.scheduleWithFixedDelay(
|
||||
BufferedRegionFile::runFlusherCheck,
|
||||
checkIntervalMs,
|
||||
checkIntervalMs,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
|
||||
flusherInitialized = true;
|
||||
}
|
||||
}
|
||||
|
||||
private static void runFlusherCheck() {
|
||||
final long currentNanos = System.nanoTime();
|
||||
final BufferedRegionFile[] copied;
|
||||
|
||||
synchronized (MANAGED_FILES) {
|
||||
copied = Arrays.copyOf(
|
||||
MANAGED_FILES.toArray(new BufferedRegionFile[0]),
|
||||
MANAGED_FILES.size(),
|
||||
BufferedRegionFile[].class
|
||||
);
|
||||
}
|
||||
|
||||
final List<BufferedRegionFile> toRemove = new ObjectArrayList<>();
|
||||
for (BufferedRegionFile file : copied) {
|
||||
if (!file.softReadLock()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean closed;
|
||||
|
||||
try {
|
||||
closed = file.isClosedRaw();
|
||||
} finally {
|
||||
file.releaseReadLock();
|
||||
}
|
||||
|
||||
if (closed) {
|
||||
toRemove.add(file);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!file.shouldSync()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final long lastWriteNanos = file.getLastWritten();
|
||||
final long timeElapsed = (currentNanos - lastWriteNanos) / 1_000_000;
|
||||
final long flushTimeoutMs = DivineConfig.MiscCategory.linearIoFlushDelayMs;
|
||||
|
||||
if (timeElapsed >= flushTimeoutMs) {
|
||||
if (!file.markAsBeingSynced()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ioWorkerPool.execute(() -> {
|
||||
try {
|
||||
file.flush();
|
||||
file.syncIfNeeded();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (MANAGED_FILES) {
|
||||
for (BufferedRegionFile file : toRemove) {
|
||||
MANAGED_FILES.remove(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void shutdown() throws InterruptedException {
|
||||
synchronized (FLUSHER_LOCK) {
|
||||
if (!flusherInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (flusherChecker != null) {
|
||||
flusherChecker.cancel(false);
|
||||
}
|
||||
|
||||
if (ioWorkerPool instanceof ExecutorService) {
|
||||
((ExecutorService) ioWorkerPool).shutdown();
|
||||
//noinspection StatementWithEmptyBody
|
||||
while (!((ExecutorService) ioWorkerPool).awaitTermination(100, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
flusherInitialized = false;
|
||||
}
|
||||
}
|
||||
|
||||
private void addToFlusherManagement() {
|
||||
synchronized (MANAGED_FILES) {
|
||||
MANAGED_FILES.add(this);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeFromFlusherManagement() {
|
||||
synchronized (MANAGED_FILES) {
|
||||
MANAGED_FILES.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean softReadLock() {
|
||||
return this.fileAccessLock.readLock().tryLock();
|
||||
}
|
||||
|
||||
public void releaseReadLock() {
|
||||
this.fileAccessLock.readLock().unlock();
|
||||
}
|
||||
|
||||
public boolean isClosedRaw() {
|
||||
return this.closed;
|
||||
}
|
||||
|
||||
public boolean shouldSync() {
|
||||
return !(boolean) SYNCED_HANDLE.get(this);
|
||||
}
|
||||
|
||||
public long getLastWritten() {
|
||||
return (long) LAST_WRITTEN_HANDLE.get(this);
|
||||
}
|
||||
|
||||
public boolean markAsBeingSynced() {
|
||||
return BEING_SYNCED_HANDLE.compareAndSet(this, false, true);
|
||||
}
|
||||
|
||||
public void syncIfNeeded() throws IOException {
|
||||
if (this.channel != null && this.channel.isOpen()) {
|
||||
this.channel.force(true);
|
||||
}
|
||||
}
|
||||
|
||||
private void readHeaders() throws IOException {
|
||||
@@ -98,8 +286,6 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
this.currentAcquiredIndex = Math.max(this.currentAcquiredIndex, sector.offset + sector.length);
|
||||
}
|
||||
}
|
||||
|
||||
BufferReleaser.clean(buffer);
|
||||
}
|
||||
|
||||
private void writeHeaders() throws IOException {
|
||||
@@ -121,8 +307,6 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
while (buffer.hasRemaining()) {
|
||||
offset += this.channel.write(buffer, offset);
|
||||
}
|
||||
|
||||
BufferReleaser.clean(buffer);
|
||||
}
|
||||
|
||||
private int sectorSize() {
|
||||
@@ -143,6 +327,10 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
}
|
||||
|
||||
private void flushInternal() throws IOException {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.writeHeaders();
|
||||
|
||||
long spareSize = this.channel.size();
|
||||
@@ -163,6 +351,7 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
}
|
||||
|
||||
private void closeInternal() throws IOException {
|
||||
this.closed = true;
|
||||
this.writeHeaders();
|
||||
this.channel.force(true);
|
||||
this.compact();
|
||||
@@ -186,35 +375,36 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
while (headerBuffer.hasRemaining()) {
|
||||
offsetHeader += tempChannel.write(headerBuffer, offsetHeader);
|
||||
}
|
||||
BufferReleaser.clean(headerBuffer);
|
||||
|
||||
int offsetPointer = this.headerSize();
|
||||
long offsetPointer = this.headerSize();
|
||||
tempChannel.position(offsetPointer);
|
||||
|
||||
for (Sector sector : this.sectors) {
|
||||
if (!sector.hasData()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final ByteBuffer sectorData = sector.read(this.channel);
|
||||
final int length = sectorData.remaining();
|
||||
|
||||
final Sector newRecalculated = new Sector(sector.index, offsetPointer, length);
|
||||
offsetPointer += length;
|
||||
this.sectors[sector.index] = newRecalculated;
|
||||
|
||||
newRecalculated.hasData = true;
|
||||
|
||||
long offset = newRecalculated.offset;
|
||||
while (sectorData.hasRemaining()) {
|
||||
offset += tempChannel.write(sectorData, offset);
|
||||
long transferred = 0;
|
||||
while (transferred < sector.length) {
|
||||
transferred += this.channel.transferTo(
|
||||
sector.offset + transferred,
|
||||
sector.length - transferred,
|
||||
tempChannel);
|
||||
}
|
||||
|
||||
BufferReleaser.clean(sectorData);
|
||||
final Sector newRecalculated = new Sector(sector.index, offsetPointer, sector.length);
|
||||
newRecalculated.hasData = true;
|
||||
|
||||
offsetPointer += sector.length;
|
||||
this.sectors[sector.index] = newRecalculated;
|
||||
}
|
||||
|
||||
tempChannel.force(true);
|
||||
this.currentAcquiredIndex = tempChannel.size();
|
||||
this.currentAcquiredIndex = offsetPointer;
|
||||
}
|
||||
|
||||
this.channel.close();
|
||||
|
||||
Files.move(
|
||||
new File(this.filePath.toString() + ".tmp").toPath(),
|
||||
this.filePath,
|
||||
@@ -242,6 +432,9 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
final Sector sector = this.sectors[chunkOrdinal];
|
||||
|
||||
sector.store(chunkData, this.channel);
|
||||
|
||||
SYNCED_HANDLE.set(this, false);
|
||||
LAST_WRITTEN_HANDLE.set(this, System.nanoTime());
|
||||
}
|
||||
|
||||
private @Nullable ByteBuffer readChunkDataRaw(int chunkOrdinal) throws IOException {
|
||||
@@ -260,6 +453,9 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
sector.clear();
|
||||
|
||||
this.writeHeaders();
|
||||
|
||||
SYNCED_HANDLE.set(this, false);
|
||||
LAST_WRITTEN_HANDLE.set(this, System.nanoTime());
|
||||
}
|
||||
|
||||
private static int getChunkIndex(int x, int z) {
|
||||
@@ -281,13 +477,12 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
final ByteBuffer chunkSectionBuilder = ByteBuffer.allocateDirect(compressedData.remaining() + 4 + 8 + 4);
|
||||
|
||||
chunkSectionBuilder.putInt(data.remaining()); // Uncompressed length
|
||||
chunkSectionBuilder.putLong(System.nanoTime()); // Timestamp
|
||||
chunkSectionBuilder.putLong(System.currentTimeMillis()); // Timestamp
|
||||
chunkSectionBuilder.putInt(xxHash32OfData); // xxHash32 of the original data
|
||||
chunkSectionBuilder.put(compressedData); // Compressed data
|
||||
chunkSectionBuilder.flip();
|
||||
|
||||
this.writeChunkDataRaw(chunkIndex, chunkSectionBuilder);
|
||||
BufferReleaser.clean(chunkSectionBuilder);
|
||||
}
|
||||
|
||||
private @Nullable ByteBuffer readChunk(int x, int z) throws IOException {
|
||||
@@ -297,17 +492,15 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
return null;
|
||||
}
|
||||
|
||||
final int uncompressedLength = compressed.getInt(); // compressed length
|
||||
final int uncompressedLength = compressed.getInt();
|
||||
final long timestamp = compressed.getLong(); // TODO use this timestamp for something?
|
||||
final int dataXXHash32 = compressed.getInt(); // XXHash32 for validation
|
||||
final int dataXXHash32 = compressed.getInt();
|
||||
|
||||
final ByteBuffer decompressed = this.decompress(this.ensureDirectBuffer(compressed), uncompressedLength);
|
||||
|
||||
BufferReleaser.clean(compressed);
|
||||
|
||||
final IOException xxHash32CheckFailedEx = this.checkXXHash32(dataXXHash32, decompressed);
|
||||
if (xxHash32CheckFailedEx != null) {
|
||||
throw xxHash32CheckFailedEx; // prevent from loading
|
||||
throw xxHash32CheckFailedEx;
|
||||
}
|
||||
|
||||
return decompressed;
|
||||
@@ -437,10 +630,8 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
final byte[] dataBytes = new byte[data.remaining()];
|
||||
data.get(dataBytes);
|
||||
|
||||
BufferReleaser.clean(data);
|
||||
|
||||
return new DataInputStream(new ByteArrayInputStream(dataBytes));
|
||||
}finally {
|
||||
} finally {
|
||||
this.fileAccessLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
@@ -450,7 +641,7 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
this.fileAccessLock.readLock().lock();
|
||||
try {
|
||||
return this.hasData(getChunkIndex(pos.x, pos.z));
|
||||
}finally {
|
||||
} finally {
|
||||
this.fileAccessLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
@@ -465,7 +656,7 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
this.fileAccessLock.writeLock().lock();
|
||||
try {
|
||||
this.clearChunkData(getChunkIndex(pos.x, pos.z));
|
||||
}finally {
|
||||
} finally {
|
||||
this.fileAccessLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
@@ -515,7 +706,6 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
public int getRecalculateCount() {
|
||||
return this.recalculateCount.get();
|
||||
}
|
||||
// MCC end
|
||||
|
||||
@Override
|
||||
public MoonriseRegionFileIO.RegionDataController.WriteData moonrise$startWrite(CompoundTag data, ChunkPos pos) {
|
||||
@@ -531,8 +721,21 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
public void flush() throws IOException {
|
||||
this.fileAccessLock.writeLock().lock();
|
||||
try {
|
||||
this.flushInternal();
|
||||
}finally {
|
||||
if ((boolean) SYNCED_HANDLE.get(this)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!BEING_SYNCED_HANDLE.compareAndSet(this, false, true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.flushInternal();
|
||||
SYNCED_HANDLE.set(this, true);
|
||||
} finally {
|
||||
BEING_SYNCED_HANDLE.set(this, false);
|
||||
}
|
||||
} finally {
|
||||
this.fileAccessLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
@@ -541,8 +744,9 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
public void close() throws IOException {
|
||||
this.fileAccessLock.writeLock().lock();
|
||||
try {
|
||||
removeFromFlusherManagement();
|
||||
this.closeInternal();
|
||||
}finally {
|
||||
} finally {
|
||||
this.fileAccessLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
@@ -577,7 +781,7 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
|
||||
long offset = this.offset;
|
||||
while (newData.hasRemaining()) {
|
||||
offset = channel.write(newData, offset);
|
||||
offset += channel.write(newData, offset);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -611,7 +815,6 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
}
|
||||
|
||||
static int sizeOfSingle() {
|
||||
// offset length hasData
|
||||
return Long.BYTES * 2 + 1;
|
||||
}
|
||||
}
|
||||
@@ -630,7 +833,7 @@ public class BufferedRegionFile implements IRegionFile {
|
||||
ByteBuffer bytebuffer = ByteBuffer.wrap(this.buf, 0, this.count);
|
||||
|
||||
BufferedRegionFile.this.writeChunk(this.pos.x, this.pos.z, bytebuffer);
|
||||
}finally {
|
||||
} finally {
|
||||
BufferedRegionFile.this.fileAccessLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user