Use a queue & a single thread

This commit is contained in:
Sofiane H. Djerbi
2023-08-03 01:31:04 +02:00
parent dd213a0c42
commit a8974f29a5

View File

@@ -39,14 +39,14 @@ index f2c27e0ac65be4b75c1d86ef6fd45fdb538d96ac..00724993d0448454d14a47652b039b88
public static final class InProgressWrite {
public long writeCounter;
diff --git a/src/main/java/dev/kaiijumc/kaiiju/KaiijuConfig.java b/src/main/java/dev/kaiijumc/kaiiju/KaiijuConfig.java
index f08bcc9ae1770fa847d8a5e873a554bef5485100..1b15a248a54ef9e8ea1c11fa33b8a8e7a4a614c7 100644
index f08bcc9ae1770fa847d8a5e873a554bef5485100..073f386bb2c95969ea2f5172cb41191b7e41a272 100644
--- a/src/main/java/dev/kaiijumc/kaiiju/KaiijuConfig.java
+++ b/src/main/java/dev/kaiijumc/kaiiju/KaiijuConfig.java
@@ -193,6 +193,9 @@ public class KaiijuConfig {
return builder.build();
}
+ public static int linearFlushFrequency = 1000;
+ public static int linearFlushFrequency = 10;
+
private static void regionFormatSettings() {
+ linearFlushFrequency = getInt("region-format.linear.flush-frequency", linearFlushFrequency);
@@ -126,10 +126,10 @@ index 0000000000000000000000000000000000000000..dcfbabf54b19a4c29d5c95830242c5c2
+}
diff --git a/src/main/java/dev/kaiijumc/kaiiju/region/LinearRegionFile.java b/src/main/java/dev/kaiijumc/kaiiju/region/LinearRegionFile.java
new file mode 100644
index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c437ad03d7d
index 0000000000000000000000000000000000000000..02208f179c5b5bc31790c8d1b6e902a46bd79962
--- /dev/null
+++ b/src/main/java/dev/kaiijumc/kaiiju/region/LinearRegionFile.java
@@ -0,0 +1,322 @@
@@ -0,0 +1,324 @@
+package dev.kaiijumc.kaiiju.region;
+
+import com.github.luben.zstd.ZstdInputStream;
@@ -174,18 +174,16 @@ index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c43
+ private final LZ4FastDecompressor decompressor;
+
+ public final ReentrantLock fileLock = new ReentrantLock(true);
+ public final ReentrantLock internalFileLock = new ReentrantLock(true);
+ private final int compressionLevel;
+
+ private AtomicBoolean markedToSave = new AtomicBoolean(false);
+ public boolean close = false;
+ public boolean closed = false;
+ public Path path;
+
+
+ public LinearRegionFile(Path file, int compression) throws IOException {
+ this.path = file;
+ this.compressionLevel = compression;
+ linearRegionFileFlusher.register(this);
+ this.compressor = LZ4Factory.fastestInstance().fastCompressor();
+ this.decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+
@@ -262,6 +260,7 @@ index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c43
+ }
+
+ private void markToSave() {
+ linearRegionFileFlusher.scheduleSave(this);
+ markedToSave.set(true);
+ }
+
@@ -271,12 +270,9 @@ index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c43
+
+ public void rawFlush() {
+ try {
+ internalFileLock.lock();
+ save();
+ } catch (IOException e) {
+ LOGGER.error("Failed to flush region file " + path.toAbsolutePath(), e);
+ } finally {
+ internalFileLock.unlock();
+ }
+ }
+
@@ -284,7 +280,7 @@ index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c43
+ throw new Exception("doesChunkExist is a stub");
+ }
+
+ private void save() throws IOException {
+ private synchronized void save() throws IOException {
+ long timestamp = getTimestamp();
+ short chunkCount = 0;
+
@@ -423,11 +419,17 @@ index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c43
+ return this.bufferUncompressedSize[getChunkIndex(pos.x, pos.z)] > 0;
+ }
+
+ public synchronized void close() throws IOException {
+ if (close) return;
+ close = true;
+ linearRegionFileFlusher.remove(this);
+ flush(); // sync
+ public void close() throws IOException {
+ if (closed) return;
+ fileLock.lock();
+ synchronized (this) {
+ try {
+ closed = true;
+ flush(); // sync
+ } finally {
+ fileLock.unlock();
+ }
+ }
+ }
+
+ private static int getChunkIndex(int x, int z) {
@@ -454,26 +456,26 @@ index 0000000000000000000000000000000000000000..ff48e2eaf1cb89d877bd344044338c43
+}
diff --git a/src/main/java/dev/kaiijumc/kaiiju/region/LinearRegionFileFlusher.java b/src/main/java/dev/kaiijumc/kaiiju/region/LinearRegionFileFlusher.java
new file mode 100644
index 0000000000000000000000000000000000000000..21e7a5f70422a2ff0b7425d01ef55bf7c81b36d2
index 0000000000000000000000000000000000000000..a43b00598490824205064422f188ae5a0778bdb9
--- /dev/null
+++ b/src/main/java/dev/kaiijumc/kaiiju/region/LinearRegionFileFlusher.java
@@ -0,0 +1,47 @@
@@ -0,0 +1,45 @@
+package dev.kaiijumc.kaiiju.region;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Set;
+import java.util.Queue;
+import java.util.concurrent.*;
+import dev.kaiijumc.kaiiju.KaiijuConfig;
+
+public class LinearRegionFileFlusher {
+ private final Set<LinearRegionFile> registeredRegionFiles = ConcurrentHashMap.newKeySet();
+ private final Queue<LinearRegionFile> savingQueue = new LinkedBlockingQueue<>();
+ private final ScheduledExecutorService scheduler;
+ private final ExecutorService executor;
+
+ public LinearRegionFileFlusher() {
+ executor = Executors.newCachedThreadPool(
+ executor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("linear-flusher-thread-%d")
+ .setNameFormat("linear-flusher-thread")
+ .build()
+ );
+ scheduler = Executors.newSingleThreadScheduledExecutor(
@@ -481,23 +483,21 @@ index 0000000000000000000000000000000000000000..21e7a5f70422a2ff0b7425d01ef55bf7
+ .setNameFormat("linear-flush-scheduler")
+ .build()
+ );
+ scheduler.scheduleAtFixedRate(this::createAndFlush, 0L, KaiijuConfig.linearFlushFrequency, TimeUnit.MILLISECONDS);
+ scheduler.scheduleAtFixedRate(this::pollAndFlush, 0L, KaiijuConfig.linearFlushFrequency, TimeUnit.SECONDS);
+ }
+
+ public void remove(LinearRegionFile regionFile) {
+ registeredRegionFiles.remove(regionFile);
+ public void scheduleSave(LinearRegionFile regionFile) {
+ if (savingQueue.contains(regionFile)) return;
+ savingQueue.add(regionFile);
+ }
+
+ public void register(LinearRegionFile regionFile) {
+ registeredRegionFiles.add(regionFile);
+ }
+
+ private void createAndFlush() {
+ registeredRegionFiles.forEach(regionFile -> {
+ if (!regionFile.close && regionFile.isMarkedToSave()) {
+ private void pollAndFlush() {
+ while (!savingQueue.isEmpty()) {
+ LinearRegionFile regionFile = savingQueue.poll();
+ if (!regionFile.closed && regionFile.isMarkedToSave()) {
+ executor.execute(regionFile::rawFlush);
+ }
+ });
+ }
+ }
+
+ public void shutdown() {