mirror of
https://github.com/Winds-Studio/Leaf.git
synced 2025-12-25 01:49:16 +00:00
Refactor async entity tracker (#390)
* prevents async entity tracker update equipment * fix seenBy updated check * skip submit empty * fix invertedVisibilityEntities data race * strict thread check * set max-threads to 1 by default * use fixed thread count * increase thread priority * Revert "use fixed thread count" This reverts commit6746bc25a8. * Revert "set max-threads to 1 by default" This reverts commit5295b6d3e1. * update entity tracker * cleanup * [ci skip] fix phrasing * cleanup * cleanup * support Citizens * optimize update if chunk player no change * configurable threads * configurable no blocking * fix pos y and z * optimize no blocking * cleanup * cleanup * add handle during waitUntilNextTick * fix entity disappear * cleanup * disable nonblocking by default * [ci skip] add entity slice * impl fork-join * fix async locator diff * optimize queue * inline iterator * [ci skip] Update patch header * cleanup * improve compatibility * add license header * optimize spin wait * remove queue-size option * dynamic adjust subtasks --------- Co-authored-by: Taiyou06 <kaandindar21@gmail.com> Co-authored-by: Dreeam <61569423+Dreeam-qwq@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,85 @@
|
||||
package org.dreeam.leaf.async;
|
||||
|
||||
import net.minecraft.Util;
|
||||
import org.dreeam.leaf.util.queue.MpmcQueue;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
public final class FixedThreadExecutor {
|
||||
private final Thread[] workers;
|
||||
private final MpmcQueue<Runnable> channel;
|
||||
private static volatile boolean SHUTDOWN = false;
|
||||
|
||||
public FixedThreadExecutor(int numThreads, int queue, String prefix) {
|
||||
if (numThreads <= 0) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
this.workers = new Thread[numThreads];
|
||||
this.channel = new MpmcQueue<>(Runnable.class, queue);
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
workers[i] = Thread.ofPlatform()
|
||||
.uncaughtExceptionHandler(Util::onThreadException)
|
||||
.daemon(false)
|
||||
.priority(Thread.NORM_PRIORITY)
|
||||
.name(prefix + " - " + i)
|
||||
.start(new Worker(channel));
|
||||
}
|
||||
}
|
||||
|
||||
public <T> FutureTask<T> submitOrRun(Callable<T> task) {
|
||||
if (SHUTDOWN) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
final FutureTask<T> t = new FutureTask<>(task);
|
||||
if (!channel.send(t)) {
|
||||
t.run();
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
public void unpack() {
|
||||
int size = Math.min(Math.max(1, channel.length()), workers.length);
|
||||
for (int i = 0; i < size; i++) {
|
||||
LockSupport.unpark(workers[i]);
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
SHUTDOWN = true;
|
||||
for (final Thread worker : workers) {
|
||||
LockSupport.unpark(worker);
|
||||
}
|
||||
}
|
||||
|
||||
public void join(long timeoutMillis) throws InterruptedException {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
for (final Thread worker : workers) {
|
||||
final long remaining = timeoutMillis - System.currentTimeMillis() + startTime;
|
||||
if (remaining <= 0) {
|
||||
return;
|
||||
}
|
||||
worker.join(remaining);
|
||||
if (worker.isAlive()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private record Worker(MpmcQueue<Runnable> channel) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!SHUTDOWN) {
|
||||
final Runnable task = channel.recv();
|
||||
if (task != null) {
|
||||
task.run();
|
||||
} else if (!SHUTDOWN) {
|
||||
LockSupport.park();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.dreeam.leaf.async.ai.AsyncGoalThread;
|
||||
import org.dreeam.leaf.async.path.AsyncPathProcessor;
|
||||
import org.dreeam.leaf.async.tracker.MultithreadedTracker;
|
||||
import org.dreeam.leaf.async.tracker.AsyncTracker;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -40,11 +40,11 @@ public class ShutdownExecutors {
|
||||
}
|
||||
}
|
||||
|
||||
if (MultithreadedTracker.TRACKER_EXECUTOR != null) {
|
||||
LOGGER.info("Waiting for mob tracker executor to shutdown...");
|
||||
MultithreadedTracker.TRACKER_EXECUTOR.shutdown();
|
||||
if (AsyncTracker.TRACKER_EXECUTOR != null) {
|
||||
LOGGER.info("Waiting for entity tracker executor to shutdown...");
|
||||
AsyncTracker.TRACKER_EXECUTOR.shutdown();
|
||||
try {
|
||||
MultithreadedTracker.TRACKER_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS);
|
||||
AsyncTracker.TRACKER_EXECUTOR.join(10_000L);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
package org.dreeam.leaf.async.tracker;
|
||||
|
||||
import ca.spottedleaf.moonrise.patches.chunk_system.level.entity.server.ServerEntityLookup;
|
||||
import net.minecraft.server.MinecraftServer;
|
||||
import net.minecraft.server.level.ServerLevel;
|
||||
import net.minecraft.world.entity.Entity;
|
||||
import org.dreeam.leaf.async.FixedThreadExecutor;
|
||||
import org.dreeam.leaf.config.modules.async.MultithreadedTracker;
|
||||
import org.dreeam.leaf.util.EntitySlice;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
public final class AsyncTracker {
|
||||
private static final String THREAD_NAME = "Leaf Async Tracker Thread";
|
||||
public static final boolean ENABLED = MultithreadedTracker.enabled;
|
||||
public static final int QUEUE = 1024;
|
||||
public static final int MIN_CHUNK = 16;
|
||||
public static final int THREADS = MultithreadedTracker.threads;
|
||||
public static final FixedThreadExecutor TRACKER_EXECUTOR = ENABLED ? new FixedThreadExecutor(
|
||||
THREADS,
|
||||
QUEUE,
|
||||
THREAD_NAME
|
||||
) : null;
|
||||
|
||||
private AsyncTracker() {
|
||||
}
|
||||
|
||||
public static void init() {
|
||||
if (TRACKER_EXECUTOR == null || !ENABLED) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
public static void tick(ServerLevel world) {
|
||||
ServerEntityLookup entityLookup = (ServerEntityLookup) world.moonrise$getEntityLookup();
|
||||
ca.spottedleaf.moonrise.common.list.ReferenceList<Entity> trackerEntities = entityLookup.trackerEntities;
|
||||
int trackerEntitiesSize = trackerEntities.size();
|
||||
if (trackerEntitiesSize == 0) {
|
||||
return;
|
||||
}
|
||||
Entity[] trackerEntitiesRaw = trackerEntities.getRawDataUnchecked();
|
||||
Entity[] entities = new Entity[trackerEntitiesSize];
|
||||
System.arraycopy(trackerEntitiesRaw, 0, entities, 0, trackerEntitiesSize);
|
||||
EntitySlice slice = new EntitySlice(entities);
|
||||
EntitySlice[] slices = entities.length <= THREADS * MIN_CHUNK ? slice.chunks(MIN_CHUNK) : slice.splitEvenly(THREADS);
|
||||
@SuppressWarnings("unchecked")
|
||||
Future<TrackerCtx>[] futures = new Future[slices.length];
|
||||
for (int i = 0; i < futures.length; i++) {
|
||||
futures[i] = TRACKER_EXECUTOR.submitOrRun(new TrackerTask(world, slices[i]));
|
||||
}
|
||||
TRACKER_EXECUTOR.unpack();
|
||||
world.trackerTask = futures;
|
||||
}
|
||||
|
||||
public static void onEntitiesTickEnd(ServerLevel world) {
|
||||
Future<TrackerCtx>[] task = world.trackerTask;
|
||||
if (task == null) {
|
||||
return;
|
||||
}
|
||||
for (Future<TrackerCtx> fut : task) {
|
||||
if (!fut.isDone()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
handle(world, task);
|
||||
}
|
||||
|
||||
public static void onTickEnd(MinecraftServer server) {
|
||||
for (ServerLevel world : server.getAllLevels()) {
|
||||
Future<TrackerCtx>[] task = world.trackerTask;
|
||||
if (task != null) {
|
||||
handle(world, task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void handle(ServerLevel world, Future<TrackerCtx>[] futures) {
|
||||
try {
|
||||
TrackerCtx ctx = futures[0].get();
|
||||
for (int i = 1; i < futures.length; i++) {
|
||||
ctx.join(futures[i].get());
|
||||
}
|
||||
world.trackerTask = null;
|
||||
ctx.handle();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,211 +0,0 @@
|
||||
package org.dreeam.leaf.async.tracker;
|
||||
|
||||
import ca.spottedleaf.moonrise.common.list.ReferenceList;
|
||||
import ca.spottedleaf.moonrise.common.misc.NearbyPlayers;
|
||||
import ca.spottedleaf.moonrise.patches.chunk_system.level.entity.server.ServerEntityLookup;
|
||||
import ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerEntity;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import net.minecraft.Util;
|
||||
import net.minecraft.server.level.ChunkMap;
|
||||
import net.minecraft.server.level.FullChunkStatus;
|
||||
import net.minecraft.server.level.ServerLevel;
|
||||
import net.minecraft.world.entity.Entity;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MultithreadedTracker {
|
||||
|
||||
private static final String THREAD_PREFIX = "Leaf Async Tracker";
|
||||
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
|
||||
private static long lastWarnMillis = System.currentTimeMillis();
|
||||
public static ThreadPoolExecutor TRACKER_EXECUTOR = null;
|
||||
|
||||
private MultithreadedTracker() {
|
||||
}
|
||||
|
||||
public static void init() {
|
||||
if (TRACKER_EXECUTOR == null) {
|
||||
TRACKER_EXECUTOR = new ThreadPoolExecutor(
|
||||
getCorePoolSize(),
|
||||
getMaxPoolSize(),
|
||||
getKeepAliveTime(), TimeUnit.SECONDS,
|
||||
getQueueImpl(),
|
||||
getThreadFactory(),
|
||||
getRejectedPolicy()
|
||||
);
|
||||
} else {
|
||||
// Temp no-op
|
||||
//throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
public static void tick(ServerLevel level) {
|
||||
try {
|
||||
if (!org.dreeam.leaf.config.modules.async.MultithreadedTracker.compatModeEnabled) {
|
||||
tickAsync(level);
|
||||
} else {
|
||||
tickAsyncWithCompatMode(level);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error occurred while executing async task.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void tickAsync(ServerLevel level) {
|
||||
final NearbyPlayers nearbyPlayers = level.moonrise$getNearbyPlayers();
|
||||
final ServerEntityLookup entityLookup = (ServerEntityLookup) level.moonrise$getEntityLookup();
|
||||
|
||||
final ReferenceList<Entity> trackerEntities = entityLookup.trackerEntities;
|
||||
final int trackerEntitiesSize = trackerEntities.size();
|
||||
final Entity[] trackerEntitiesRaw = trackerEntities.getRawDataUnchecked();
|
||||
|
||||
// Move tracking to off-main
|
||||
TRACKER_EXECUTOR.execute(() -> {
|
||||
for (int i = 0; i < trackerEntitiesSize; i++) {
|
||||
Entity entity = trackerEntitiesRaw[i];
|
||||
if (entity == null) continue;
|
||||
|
||||
final ChunkMap.TrackedEntity tracker = ((EntityTrackerEntity) entity).moonrise$getTrackedEntity();
|
||||
|
||||
if (tracker == null) continue;
|
||||
|
||||
synchronized (tracker) {
|
||||
NearbyPlayers.TrackedChunk trackedChunk = nearbyPlayers.getChunk(entity.chunkPosition());
|
||||
tracker.moonrise$tick(trackedChunk);
|
||||
tracker.serverEntity.sendChanges();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void tickAsyncWithCompatMode(ServerLevel level) {
|
||||
final NearbyPlayers nearbyPlayers = level.moonrise$getNearbyPlayers();
|
||||
final ServerEntityLookup entityLookup = (ServerEntityLookup) level.moonrise$getEntityLookup();
|
||||
|
||||
final ReferenceList<Entity> trackerEntities = entityLookup.trackerEntities;
|
||||
final Entity[] trackerEntitiesRaw = trackerEntities.getRawDataUnchecked();
|
||||
final int trackerEntitiesSize = trackerEntities.size();
|
||||
final Runnable[] sendChangesTasks = new Runnable[trackerEntitiesSize];
|
||||
final Runnable[] tickTask = new Runnable[trackerEntitiesSize];
|
||||
int index = 0;
|
||||
|
||||
for (int i = 0; i < trackerEntitiesSize; i++) {
|
||||
Entity entity = trackerEntitiesRaw[i];
|
||||
if (entity == null) continue;
|
||||
|
||||
final ChunkMap.TrackedEntity tracker = ((EntityTrackerEntity) entity).moonrise$getTrackedEntity();
|
||||
|
||||
if (tracker == null) continue;
|
||||
|
||||
synchronized (tracker) {
|
||||
tickTask[index] = tracker.leafTickCompact(nearbyPlayers.getChunk(entity.chunkPosition()));
|
||||
sendChangesTasks[index] = tracker.serverEntity::sendChanges; // Collect send changes to task array
|
||||
}
|
||||
index++;
|
||||
}
|
||||
|
||||
// batch submit tasks
|
||||
TRACKER_EXECUTOR.execute(() -> {
|
||||
for (final Runnable tick : tickTask) {
|
||||
if (tick == null) continue;
|
||||
|
||||
tick.run();
|
||||
}
|
||||
for (final Runnable sendChanges : sendChangesTasks) {
|
||||
if (sendChanges == null) continue;
|
||||
|
||||
sendChanges.run();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Original ChunkMap#newTrackerTick of Paper
|
||||
// Just for diff usage for future update
|
||||
private static void tickOriginal(ServerLevel level) {
|
||||
final ca.spottedleaf.moonrise.patches.chunk_system.level.entity.server.ServerEntityLookup entityLookup = (ca.spottedleaf.moonrise.patches.chunk_system.level.entity.server.ServerEntityLookup) ((ca.spottedleaf.moonrise.patches.chunk_system.level.ChunkSystemServerLevel) level).moonrise$getEntityLookup();
|
||||
|
||||
final ca.spottedleaf.moonrise.common.list.ReferenceList<net.minecraft.world.entity.Entity> trackerEntities = entityLookup.trackerEntities;
|
||||
final Entity[] trackerEntitiesRaw = trackerEntities.getRawDataUnchecked();
|
||||
for (int i = 0, len = trackerEntities.size(); i < len; ++i) {
|
||||
final Entity entity = trackerEntitiesRaw[i];
|
||||
final ChunkMap.TrackedEntity tracker = ((ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerEntity) entity).moonrise$getTrackedEntity();
|
||||
if (tracker == null) {
|
||||
continue;
|
||||
}
|
||||
((ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerTrackedEntity) tracker).moonrise$tick(((ca.spottedleaf.moonrise.patches.chunk_system.entity.ChunkSystemEntity) entity).moonrise$getChunkData().nearbyPlayers);
|
||||
if (((ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerTrackedEntity) tracker).moonrise$hasPlayers()
|
||||
|| ((ca.spottedleaf.moonrise.patches.chunk_system.entity.ChunkSystemEntity) entity).moonrise$getChunkStatus().isOrAfter(FullChunkStatus.ENTITY_TICKING)) {
|
||||
tracker.serverEntity.sendChanges();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static int getCorePoolSize() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
private static int getMaxPoolSize() {
|
||||
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
|
||||
}
|
||||
|
||||
private static long getKeepAliveTime() {
|
||||
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
|
||||
}
|
||||
|
||||
private static BlockingQueue<Runnable> getQueueImpl() {
|
||||
final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerQueueSize;
|
||||
|
||||
return new LinkedBlockingQueue<>(queueCapacity);
|
||||
}
|
||||
|
||||
private static @NotNull ThreadFactory getThreadFactory() {
|
||||
return new ThreadFactoryBuilder()
|
||||
.setThreadFactory(MultithreadedTrackerThread::new)
|
||||
.setNameFormat(THREAD_PREFIX + " Thread - %d")
|
||||
.setPriority(Thread.NORM_PRIORITY - 2)
|
||||
.setUncaughtExceptionHandler(Util::onThreadException)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static @NotNull RejectedExecutionHandler getRejectedPolicy() {
|
||||
return (rejectedTask, executor) -> {
|
||||
BlockingQueue<Runnable> workQueue = executor.getQueue();
|
||||
|
||||
if (!executor.isShutdown()) {
|
||||
if (!workQueue.isEmpty()) {
|
||||
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());
|
||||
|
||||
workQueue.drainTo(pendingTasks);
|
||||
|
||||
for (Runnable pendingTask : pendingTasks) {
|
||||
pendingTask.run();
|
||||
}
|
||||
}
|
||||
|
||||
rejectedTask.run();
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
|
||||
LOGGER.warn("Async entity tracker is busy! Tracking tasks will be done in the server thread. Increasing max-threads in Leaf config may help.");
|
||||
lastWarnMillis = System.currentTimeMillis();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static class MultithreadedTrackerThread extends Thread {
|
||||
|
||||
public MultithreadedTrackerThread(Runnable runnable) {
|
||||
super(runnable);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,274 @@
|
||||
package org.dreeam.leaf.async.tracker;
|
||||
|
||||
import ca.spottedleaf.moonrise.common.misc.NearbyPlayers;
|
||||
import io.papermc.paper.event.player.PlayerTrackEntityEvent;
|
||||
import io.papermc.paper.event.player.PlayerUntrackEntityEvent;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
|
||||
import net.minecraft.network.protocol.Packet;
|
||||
import net.minecraft.network.protocol.game.ClientGamePacketListener;
|
||||
import net.minecraft.network.protocol.game.ClientboundBundlePacket;
|
||||
import net.minecraft.network.protocol.game.ClientboundRemoveEntitiesPacket;
|
||||
import net.minecraft.network.protocol.game.ClientboundSetEntityMotionPacket;
|
||||
import net.minecraft.server.level.ChunkMap;
|
||||
import net.minecraft.server.level.FullChunkStatus;
|
||||
import net.minecraft.server.level.ServerLevel;
|
||||
import net.minecraft.server.level.ServerPlayer;
|
||||
import net.minecraft.server.network.ServerGamePacketListenerImpl;
|
||||
import net.minecraft.server.network.ServerPlayerConnection;
|
||||
import net.minecraft.world.entity.Entity;
|
||||
import net.minecraft.world.entity.boss.wither.WitherBoss;
|
||||
import net.minecraft.world.entity.decoration.ItemFrame;
|
||||
import net.minecraft.world.item.ItemStack;
|
||||
import net.minecraft.world.item.MapItem;
|
||||
import net.minecraft.world.level.saveddata.maps.MapId;
|
||||
import net.minecraft.world.level.saveddata.maps.MapItemSavedData;
|
||||
import org.bukkit.event.player.PlayerVelocityEvent;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public final class TrackerCtx {
|
||||
private static final int SIZE_LIMIT_PER_BUNDLE = 4096;
|
||||
|
||||
private final Reference2ReferenceOpenHashMap<ServerPlayerConnection, ObjectArrayList<Packet<? super ClientGamePacketListener>>> packets;
|
||||
private final ServerLevel world;
|
||||
private final ObjectArrayList<ServerPlayer> bukkitVelocityEvent = new ObjectArrayList<>();
|
||||
private final ObjectArrayList<ItemFrame> bukkitItemFrames = new ObjectArrayList<>();
|
||||
private final ObjectArrayList<BossEvent> witherBosses = new ObjectArrayList<>();
|
||||
private final ObjectArrayList<PaperStopSeen> paperStopSeen = new ObjectArrayList<>();
|
||||
private final ObjectArrayList<PaperStartSeen> paperStartSeen = new ObjectArrayList<>();
|
||||
private final ObjectArrayList<Entity> pluginEntity = new ObjectArrayList<>();
|
||||
|
||||
private record BossEvent(WitherBoss witherBoss, ObjectArrayList<ServerPlayer> add, ObjectArrayList<ServerPlayer> remove) {}
|
||||
private record PaperStopSeen(Entity e, ObjectArrayList<ServerPlayerConnection> q) {}
|
||||
private record PaperStartSeen(Entity e, ObjectArrayList<ServerPlayerConnection> q) {}
|
||||
|
||||
public TrackerCtx(ServerLevel world) {
|
||||
this.packets = new Reference2ReferenceOpenHashMap<>();
|
||||
this.world = world;
|
||||
}
|
||||
|
||||
public void stopSeenByPlayer(ServerPlayerConnection connection, Entity entity) {
|
||||
if (PlayerUntrackEntityEvent.getHandlerList().getRegisteredListeners().length != 0) {
|
||||
if (paperStopSeen.isEmpty()) {
|
||||
paperStopSeen.add(new PaperStopSeen(entity, new ObjectArrayList<>()));
|
||||
}
|
||||
if (!paperStopSeen.getLast().e.equals(entity)) {
|
||||
paperStopSeen.add(new PaperStopSeen(entity, new ObjectArrayList<>()));
|
||||
}
|
||||
paperStopSeen.getLast().q.add(connection);
|
||||
}
|
||||
if (entity instanceof WitherBoss witherBoss) {
|
||||
if (witherBosses.isEmpty()) {
|
||||
witherBosses.add(new BossEvent(witherBoss, new ObjectArrayList<>(), new ObjectArrayList<>()));
|
||||
}
|
||||
if (!witherBosses.getLast().witherBoss.equals(witherBoss)) {
|
||||
witherBosses.add(new BossEvent(witherBoss, new ObjectArrayList<>(), new ObjectArrayList<>()));
|
||||
}
|
||||
witherBosses.getLast().remove.add(connection.getPlayer());
|
||||
}
|
||||
}
|
||||
|
||||
public void startSeenByPlayer(ServerPlayerConnection connection, Entity entity) {
|
||||
if (PlayerTrackEntityEvent.getHandlerList().getRegisteredListeners().length != 0) {
|
||||
if (paperStartSeen.isEmpty()) {
|
||||
paperStartSeen.add(new PaperStartSeen(entity, new ObjectArrayList<>()));
|
||||
}
|
||||
if (!paperStartSeen.getLast().e.equals(entity)) {
|
||||
paperStartSeen.add(new PaperStartSeen(entity, new ObjectArrayList<>()));
|
||||
}
|
||||
paperStartSeen.getLast().q.add(connection);
|
||||
}
|
||||
if (entity instanceof WitherBoss witherBoss) {
|
||||
if (witherBosses.isEmpty()) {
|
||||
witherBosses.add(new BossEvent(witherBoss, new ObjectArrayList<>(), new ObjectArrayList<>()));
|
||||
}
|
||||
if (!witherBosses.getLast().witherBoss.equals(witherBoss)) {
|
||||
witherBosses.add(new BossEvent(witherBoss, new ObjectArrayList<>(), new ObjectArrayList<>()));
|
||||
}
|
||||
witherBosses.getLast().add.add(connection.getPlayer());
|
||||
}
|
||||
}
|
||||
|
||||
public void updateItemFrame(ItemFrame itemFrame) {
|
||||
bukkitItemFrames.add(itemFrame);
|
||||
}
|
||||
|
||||
public void playerVelocity(ServerPlayer player) {
|
||||
if (PlayerVelocityEvent.getHandlerList().getRegisteredListeners().length == 0) {
|
||||
player.hurtMarked = false;
|
||||
player.moonrise$getTrackedEntity().leafBroadcastAndSend(this, new ClientboundSetEntityMotionPacket(player));
|
||||
} else {
|
||||
bukkitVelocityEvent.add(player);
|
||||
}
|
||||
}
|
||||
|
||||
public void citizensEntity(Entity entity) {
|
||||
pluginEntity.add(entity);
|
||||
}
|
||||
|
||||
public void send(ServerPlayerConnection connection, Packet<? super ClientGamePacketListener> packet) {
|
||||
packets.computeIfAbsent(connection, x -> new ObjectArrayList<>()).add(packet);
|
||||
}
|
||||
|
||||
void join(TrackerCtx other) {
|
||||
bukkitVelocityEvent.addAll(other.bukkitVelocityEvent);
|
||||
bukkitItemFrames.addAll(other.bukkitItemFrames);
|
||||
paperStopSeen.addAll(other.paperStopSeen);
|
||||
paperStartSeen.addAll(other.paperStartSeen);
|
||||
pluginEntity.addAll(other.pluginEntity);
|
||||
if (other.packets.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
var iterator = other.packets.reference2ReferenceEntrySet().fastIterator();
|
||||
while (iterator.hasNext()) {
|
||||
var entry = iterator.next();
|
||||
packets.computeIfAbsent(entry.getKey(), x -> new ObjectArrayList<>()).addAll(entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
void handle() {
|
||||
handlePackets(world, packets);
|
||||
|
||||
if (!pluginEntity.isEmpty()) {
|
||||
for (final Entity entity : pluginEntity) {
|
||||
final ChunkMap.TrackedEntity tracker = ((ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerEntity)entity).moonrise$getTrackedEntity();
|
||||
if (tracker == null) {
|
||||
continue;
|
||||
}
|
||||
NearbyPlayers.TrackedChunk trackedChunk = world.moonrise$getNearbyPlayers().getChunk(entity.chunkPosition());
|
||||
tracker.leafTick(this, trackedChunk);
|
||||
boolean flag = false;
|
||||
if (tracker.moonrise$hasPlayers()) {
|
||||
flag = true;
|
||||
} else {
|
||||
FullChunkStatus status = ((ca.spottedleaf.moonrise.patches.chunk_system.entity.ChunkSystemEntity) entity).moonrise$getChunkStatus();
|
||||
if (status != null && status.isOrAfter(FullChunkStatus.ENTITY_TICKING)) {
|
||||
flag = true;
|
||||
}
|
||||
}
|
||||
if (flag) {
|
||||
tracker.serverEntity.leafSendChanges(this, tracker);
|
||||
}
|
||||
}
|
||||
pluginEntity.clear();
|
||||
}
|
||||
if (!bukkitVelocityEvent.isEmpty()) {
|
||||
for (ServerPlayer player : bukkitVelocityEvent) {
|
||||
if (!world.equals(player.level())) {
|
||||
continue;
|
||||
}
|
||||
boolean cancelled = false;
|
||||
|
||||
org.bukkit.entity.Player player1 = player.getBukkitEntity();
|
||||
org.bukkit.util.Vector velocity = player1.getVelocity();
|
||||
|
||||
PlayerVelocityEvent event = new PlayerVelocityEvent(player1, velocity.clone());
|
||||
if (!event.callEvent()) {
|
||||
cancelled = true;
|
||||
} else if (!velocity.equals(event.getVelocity())) {
|
||||
player1.setVelocity(event.getVelocity());
|
||||
}
|
||||
if (!cancelled) {
|
||||
player.hurtMarked = false;
|
||||
ChunkMap.TrackedEntity trackedEntity = player.moonrise$getTrackedEntity();
|
||||
trackedEntity.leafBroadcast(this, new ClientboundSetEntityMotionPacket(player));
|
||||
}
|
||||
}
|
||||
bukkitVelocityEvent.clear();
|
||||
}
|
||||
if (!bukkitItemFrames.isEmpty()) {
|
||||
for (ItemFrame itemFrame : bukkitItemFrames) {
|
||||
MapId mapId = itemFrame.cachedMapId; // Paper - Perf: Cache map ids on item frames
|
||||
MapItemSavedData savedData = MapItem.getSavedData(mapId, world);
|
||||
if (savedData != null) {
|
||||
ChunkMap.TrackedEntity trackedEntity = itemFrame.moonrise$getTrackedEntity();
|
||||
if (trackedEntity != null) {
|
||||
ItemStack item = itemFrame.getItem();
|
||||
for (final net.minecraft.server.network.ServerPlayerConnection connection : trackedEntity.seenBy()) {
|
||||
final ServerPlayer serverPlayer = connection.getPlayer(); // Paper
|
||||
savedData.tickCarriedBy(serverPlayer, item);
|
||||
Packet<? super ClientGamePacketListener> updatePacket = (Packet<? super ClientGamePacketListener>) savedData.getUpdatePacket(mapId, serverPlayer);
|
||||
if (updatePacket != null) {
|
||||
send(serverPlayer.connection, updatePacket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bukkitItemFrames.clear();
|
||||
}
|
||||
if (!witherBosses.isEmpty()) {
|
||||
for (BossEvent witherBoss : witherBosses) {
|
||||
for (ServerPlayer player : witherBoss.add) {
|
||||
if (!world.equals(player.level())) {
|
||||
continue;
|
||||
}
|
||||
witherBoss.witherBoss.bossEvent.leafAddPlayer(this, player);
|
||||
}
|
||||
for (ServerPlayer player : witherBoss.remove) {
|
||||
witherBoss.witherBoss.bossEvent.leafRemovePlayer(this, player);
|
||||
}
|
||||
}
|
||||
witherBosses.clear();
|
||||
}
|
||||
if (!paperStartSeen.isEmpty()) {
|
||||
for (PaperStartSeen startSeen : paperStartSeen) {
|
||||
for (ServerPlayerConnection connection : startSeen.q) {
|
||||
if (!new PlayerTrackEntityEvent(
|
||||
connection.getPlayer().getBukkitEntity(),
|
||||
startSeen.e.getBukkitEntity()
|
||||
).callEvent()) {
|
||||
send(connection, new ClientboundRemoveEntitiesPacket(startSeen.e.getId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
paperStartSeen.clear();
|
||||
}
|
||||
if (!paperStopSeen.isEmpty()) {
|
||||
for (PaperStopSeen stopSeen : paperStopSeen) {
|
||||
for (ServerPlayerConnection connection : stopSeen.q) {
|
||||
new PlayerUntrackEntityEvent(
|
||||
connection.getPlayer().getBukkitEntity(),
|
||||
stopSeen.e.getBukkitEntity()
|
||||
).callEvent();
|
||||
}
|
||||
}
|
||||
paperStopSeen.clear();
|
||||
}
|
||||
|
||||
handlePackets(world, packets);
|
||||
}
|
||||
|
||||
private static void handlePackets(ServerLevel world, Reference2ReferenceOpenHashMap<ServerPlayerConnection, ObjectArrayList<Packet<? super ClientGamePacketListener>>> packets) {
|
||||
if (packets.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
var iter = packets.reference2ReferenceEntrySet().fastIterator();
|
||||
while (iter.hasNext()) {
|
||||
var entry = iter.next();
|
||||
ServerPlayerConnection connection = entry.getKey();
|
||||
ObjectArrayList<Packet<? super ClientGamePacketListener>> list = entry.getValue();
|
||||
if (!world.equals(connection.getPlayer().level())) {
|
||||
continue;
|
||||
}
|
||||
int size = list.size();
|
||||
if (size > SIZE_LIMIT_PER_BUNDLE) {
|
||||
int from = 0;
|
||||
while (from < size) {
|
||||
int chunkLen = Math.min(SIZE_LIMIT_PER_BUNDLE, size - from);
|
||||
Packet<? super ClientGamePacketListener>[] chunk = new Packet[chunkLen];
|
||||
list.getElements(from, chunk, 0, chunkLen);
|
||||
connection.send(new ClientboundBundlePacket(Arrays.asList(chunk)));
|
||||
from += chunkLen;
|
||||
}
|
||||
} else {
|
||||
connection.send(new ClientboundBundlePacket(list));
|
||||
}
|
||||
if (connection instanceof ServerGamePacketListenerImpl conn) {
|
||||
conn.connection.flushChannel();
|
||||
}
|
||||
}
|
||||
packets.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package org.dreeam.leaf.async.tracker;
|
||||
|
||||
import ca.spottedleaf.moonrise.common.misc.NearbyPlayers;
|
||||
import net.minecraft.server.level.ChunkMap;
|
||||
import net.minecraft.server.level.FullChunkStatus;
|
||||
import net.minecraft.server.level.ServerLevel;
|
||||
import net.minecraft.world.entity.Entity;
|
||||
import org.dreeam.leaf.util.EntitySlice;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
public final class TrackerTask implements Callable<TrackerCtx> {
|
||||
public final ServerLevel world;
|
||||
private final EntitySlice entities;
|
||||
|
||||
public TrackerTask(ServerLevel world, EntitySlice trackerEntities) {
|
||||
this.world = world;
|
||||
this.entities = trackerEntities;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TrackerCtx call() throws Exception {
|
||||
NearbyPlayers nearbyPlayers = world.moonrise$getNearbyPlayers();
|
||||
TrackerCtx ctx = new TrackerCtx(this.world);
|
||||
final Entity[] raw = entities.array();
|
||||
for (int i = entities.start(); i < entities.end(); i++) {
|
||||
final Entity entity = raw[i];
|
||||
final ChunkMap.TrackedEntity tracker = ((ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerEntity)entity).moonrise$getTrackedEntity();
|
||||
if (tracker == null) {
|
||||
continue;
|
||||
}
|
||||
if (tracker.getClass() != ChunkMap.TrackedEntity.class) {
|
||||
ctx.citizensEntity(entity);
|
||||
continue;
|
||||
}
|
||||
NearbyPlayers.TrackedChunk trackedChunk = nearbyPlayers.getChunk(entity.chunkPosition());
|
||||
|
||||
tracker.leafTick(ctx, trackedChunk);
|
||||
boolean flag = false;
|
||||
if (tracker.moonrise$hasPlayers()) {
|
||||
flag = true;
|
||||
} else {
|
||||
// may read old value
|
||||
FullChunkStatus status = ((ca.spottedleaf.moonrise.patches.chunk_system.entity.ChunkSystemEntity) entity).moonrise$getChunkStatus();
|
||||
// removed in world
|
||||
if (status != null && status.isOrAfter(FullChunkStatus.ENTITY_TICKING)) {
|
||||
flag = true;
|
||||
}
|
||||
}
|
||||
if (flag) {
|
||||
tracker.serverEntity.leafSendChanges(ctx, tracker);
|
||||
}
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
package org.dreeam.leaf.config.modules.async;
|
||||
|
||||
import org.dreeam.leaf.async.tracker.AsyncTracker;
|
||||
import org.dreeam.leaf.config.ConfigModules;
|
||||
import org.dreeam.leaf.config.EnumConfigCategory;
|
||||
import org.dreeam.leaf.config.LeafConfig;
|
||||
import org.dreeam.leaf.config.annotations.Experimental;
|
||||
|
||||
public class MultithreadedTracker extends ConfigModules {
|
||||
|
||||
@@ -10,28 +12,20 @@ public class MultithreadedTracker extends ConfigModules {
|
||||
return EnumConfigCategory.ASYNC.getBaseKeyName() + ".async-entity-tracker";
|
||||
}
|
||||
|
||||
@Experimental
|
||||
public static boolean enabled = false;
|
||||
public static boolean compatModeEnabled = false;
|
||||
public static int asyncEntityTrackerMaxThreads = 0;
|
||||
public static int asyncEntityTrackerKeepalive = 60;
|
||||
public static int asyncEntityTrackerQueueSize = 0;
|
||||
public static int threads = 0;
|
||||
private static boolean asyncMultithreadedTrackerInitialized;
|
||||
|
||||
@Override
|
||||
public void onLoaded() {
|
||||
config.addCommentRegionBased(getBasePath(), """
|
||||
Make entity tracking saving asynchronously, can improve performance significantly,
|
||||
especially in some massive entities in small area situations.""",
|
||||
"""
|
||||
** Experimental Feature **
|
||||
Make entity tracking asynchronously, can improve performance significantly,
|
||||
especially in some massive entities in small area situations.""", """
|
||||
** 实验性功能 **
|
||||
异步实体跟踪,
|
||||
在实体数量多且密集的情况下效果明显.""");
|
||||
config.addCommentRegionBased(getBasePath() + ".compat-mode", """
|
||||
Enable compat mode ONLY if Citizens or NPC plugins using real entity has installed,
|
||||
Compat mode fixed visible issue with player type NPCs of Citizens,
|
||||
But still recommend to use packet based / virtual entity NPC plugin, e.g. ZNPC Plus, Adyeshach, Fancy NPC or else.""",
|
||||
"""
|
||||
是否启用兼容模式,
|
||||
如果你的服务器安装了 Citizens 或其他类似非发包 NPC 插件, 请开启此项.""");
|
||||
|
||||
if (asyncMultithreadedTrackerInitialized) {
|
||||
config.getConfigSection(getBasePath());
|
||||
@@ -39,27 +33,18 @@ public class MultithreadedTracker extends ConfigModules {
|
||||
}
|
||||
asyncMultithreadedTrackerInitialized = true;
|
||||
|
||||
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
|
||||
compatModeEnabled = config.getBoolean(getBasePath() + ".compat-mode", compatModeEnabled);
|
||||
asyncEntityTrackerMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncEntityTrackerMaxThreads);
|
||||
asyncEntityTrackerKeepalive = config.getInt(getBasePath() + ".keepalive", asyncEntityTrackerKeepalive);
|
||||
asyncEntityTrackerQueueSize = config.getInt(getBasePath() + ".queue-size", asyncEntityTrackerQueueSize);
|
||||
|
||||
if (asyncEntityTrackerMaxThreads < 0)
|
||||
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1);
|
||||
else if (asyncEntityTrackerMaxThreads == 0)
|
||||
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
|
||||
|
||||
if (!enabled)
|
||||
asyncEntityTrackerMaxThreads = 0;
|
||||
else
|
||||
LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", asyncEntityTrackerMaxThreads);
|
||||
|
||||
if (asyncEntityTrackerQueueSize <= 0)
|
||||
asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * 384;
|
||||
|
||||
enabled = config.getBoolean(getBasePath() + ".enabled", false);
|
||||
threads = config.getInt(getBasePath() + ".threads", 0);
|
||||
int aval = Runtime.getRuntime().availableProcessors();
|
||||
if (threads < 0) {
|
||||
threads = aval + threads;
|
||||
} else if (threads == 0) {
|
||||
threads = Math.min(aval, 8);
|
||||
}
|
||||
threads = Math.max(threads, 1);
|
||||
if (enabled) {
|
||||
org.dreeam.leaf.async.tracker.MultithreadedTracker.init();
|
||||
LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", threads);
|
||||
AsyncTracker.init();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
108
leaf-server/src/main/java/org/dreeam/leaf/util/EntitySlice.java
Normal file
108
leaf-server/src/main/java/org/dreeam/leaf/util/EntitySlice.java
Normal file
@@ -0,0 +1,108 @@
|
||||
package org.dreeam.leaf.util;
|
||||
|
||||
import net.minecraft.world.entity.Entity;
|
||||
import org.jspecify.annotations.NullMarked;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
@NullMarked
|
||||
public record EntitySlice(Entity[] array, int start, int end) implements Iterable<Entity> {
|
||||
public EntitySlice(final Entity[] entities) {
|
||||
this(entities, 0, entities.length);
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return end - start;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return start >= end;
|
||||
}
|
||||
|
||||
public Entity get(final int index) {
|
||||
return array[start + index];
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Entity> iterator() {
|
||||
return new SliceIterator(this);
|
||||
}
|
||||
|
||||
public EntitySlice[] splitEvenly(int parts) {
|
||||
if (parts > size()) {
|
||||
parts = size();
|
||||
}
|
||||
if (parts <= 1) {
|
||||
return new EntitySlice[]{this};
|
||||
}
|
||||
|
||||
final EntitySlice[] result = new EntitySlice[parts];
|
||||
final int sliceSize = size();
|
||||
final int base = sliceSize / parts;
|
||||
final int remainder = sliceSize % parts;
|
||||
|
||||
int curr = start;
|
||||
for (int i = 0; i < parts; i++) {
|
||||
int endIdx = curr + base + (i < remainder ? 1 : 0);
|
||||
result[i] = new EntitySlice(array, curr, endIdx);
|
||||
curr = endIdx;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public EntitySlice[] splitAt(final int index) {
|
||||
final int m = start + index;
|
||||
return new EntitySlice[]{
|
||||
new EntitySlice(array, start, m),
|
||||
new EntitySlice(array, m, end)
|
||||
};
|
||||
}
|
||||
|
||||
public EntitySlice subSlice(final int startIndex, final int endIndex) {
|
||||
return new EntitySlice(array, start + startIndex, start + endIndex);
|
||||
}
|
||||
|
||||
public EntitySlice subSlice(final int startIndex) {
|
||||
return subSlice(startIndex, size());
|
||||
}
|
||||
|
||||
public EntitySlice[] chunks(final int chunkSize) {
|
||||
if (isEmpty() || chunkSize <= 0) {
|
||||
return new EntitySlice[0];
|
||||
}
|
||||
|
||||
final int len = (size() + chunkSize - 1) / chunkSize;
|
||||
EntitySlice[] result = new EntitySlice[len];
|
||||
|
||||
int curr = start;
|
||||
for (int i = 0; i < len; i++) {
|
||||
final int endIdx = Math.min(curr + chunkSize, end);
|
||||
result[i] = new EntitySlice(array, curr, endIdx);
|
||||
curr = endIdx;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static final class SliceIterator implements Iterator<Entity> {
|
||||
private final EntitySlice slice;
|
||||
private int current;
|
||||
|
||||
public SliceIterator(EntitySlice slice) {
|
||||
this.slice = slice;
|
||||
this.current = slice.start;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return current < slice.end;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Entity next() {
|
||||
if (!hasNext()) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
return slice.array[current++];
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,231 @@
|
||||
// Copyright (c) 2018 Aron Wieck Crown Communications GmbH, Karlsruhe, Germany
|
||||
// Licensed under the terms of MIT license and the Apache License (Version 2.0).
|
||||
|
||||
package org.dreeam.leaf.util.queue;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
|
||||
public final class MpmcQueue<T> {
|
||||
private static final int MAX_IN_PROGRESS = 16;
|
||||
private static final long DONE_MASK = 0x0000_0000_0000_FF00L;
|
||||
private static final long PENDING_MASK = 0x0000_0000_0000_00FFL;
|
||||
private static final long FAST_PATH_MASK = 0x00FF_FFFF_FFFF_FF00L;
|
||||
private static final int MAX_CAPACITY = 1 << 30;
|
||||
private static final int PARALLELISM = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
private static final VarHandle READ;
|
||||
private static final VarHandle WRITE;
|
||||
|
||||
private final int mask;
|
||||
private final T[] buffer;
|
||||
@SuppressWarnings("unused")
|
||||
private final Padded padded1 = new Padded();
|
||||
@SuppressWarnings("FieldMayBeFinal")
|
||||
private volatile long reads = 0L;
|
||||
@SuppressWarnings("unused")
|
||||
private final Padded padded2 = new Padded();
|
||||
@SuppressWarnings("FieldMayBeFinal")
|
||||
private volatile long writes = 0L;
|
||||
|
||||
static {
|
||||
try {
|
||||
MethodHandles.Lookup l = MethodHandles.lookup();
|
||||
READ = l.findVarHandle(MpmcQueue.class, "reads",
|
||||
long.class);
|
||||
WRITE = l.findVarHandle(MpmcQueue.class, "writes",
|
||||
long.class);
|
||||
} catch (ReflectiveOperationException e) {
|
||||
throw new ExceptionInInitializerError(e);
|
||||
}
|
||||
}
|
||||
|
||||
public MpmcQueue(Class<T> clazz, int capacity) {
|
||||
if (capacity <= 0 || capacity > MAX_CAPACITY) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
this.mask = (1 << (Integer.SIZE - Integer.numberOfLeadingZeros(capacity - 1))) - 1;
|
||||
//noinspection unchecked
|
||||
this.buffer = (clazz == Object.class)
|
||||
? (T[]) new Object[mask + 1]
|
||||
: (T[]) java.lang.reflect.Array.newInstance(clazz, mask + 1);
|
||||
}
|
||||
|
||||
private void spinWait(final int attempts) {
|
||||
if (attempts == 0) {
|
||||
} else if (PARALLELISM != 1 && (attempts & 31) != 31) {
|
||||
Thread.onSpinWait();
|
||||
} else {
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean send(final T item) {
|
||||
long write = (long) WRITE.getAcquire(this);
|
||||
boolean success;
|
||||
long newWrite = 0L;
|
||||
int index = 0;
|
||||
int attempts = 0;
|
||||
while (true) {
|
||||
spinWait(attempts++);
|
||||
final int inProgressCnt = (int) (write & PENDING_MASK);
|
||||
if ((((int) (write >>> 16) + 1) & mask) == (int) ((long) READ.getAcquire(this) >>> 16)) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (inProgressCnt == MAX_IN_PROGRESS) {
|
||||
write = (long) WRITE.getAcquire(this);
|
||||
continue;
|
||||
}
|
||||
|
||||
index = ((int) (write >>> 16) + inProgressCnt) & mask;
|
||||
|
||||
if (((index + 1) & mask) == (int) ((long) READ.getAcquire(this) >>> 16)) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
|
||||
newWrite = write + 1;
|
||||
if (WRITE.weakCompareAndSetAcquire(this, write, newWrite)) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
write = (long) WRITE.getVolatile(this);
|
||||
}
|
||||
if (!success) {
|
||||
return false;
|
||||
}
|
||||
buffer[index] = item;
|
||||
if ((newWrite & FAST_PATH_MASK) == ((long) index << 16) && index < mask) {
|
||||
WRITE.getAndAddRelease(this, (1L << 16) - 1);
|
||||
} else {
|
||||
write = newWrite;
|
||||
while (true) {
|
||||
final int inProcessCnt = (int) (write & PENDING_MASK);
|
||||
final long n;
|
||||
if (((int) ((write & DONE_MASK) >>> 8) + 1) == inProcessCnt) {
|
||||
n = ((long) (((int) (write >>> 16) + inProcessCnt) & mask)) << 16;
|
||||
} else if ((int) (write >>> 16) == index) {
|
||||
n = (write + (1L << 16) - 1) & (((long) mask << 16) | 0xFFFFL);
|
||||
} else {
|
||||
n = write + (1L << 8);
|
||||
}
|
||||
|
||||
if (WRITE.weakCompareAndSetRelease(this, write, n)) {
|
||||
break;
|
||||
}
|
||||
write = (long) WRITE.getVolatile(this);
|
||||
spinWait(attempts++);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public T recv() {
|
||||
long read = (long) READ.getAcquire(this);
|
||||
boolean success;
|
||||
int index = 0;
|
||||
long newRead = 0L;
|
||||
int attempts = 0;
|
||||
while (true) {
|
||||
spinWait(attempts++);
|
||||
final int inProgressCnt = (int) (read & PENDING_MASK);
|
||||
if ((int) (read >>> 16) == (int) ((long) WRITE.getAcquire(this) >>> 16)) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (inProgressCnt == MAX_IN_PROGRESS) {
|
||||
read = (long) READ.getAcquire(this);
|
||||
continue;
|
||||
}
|
||||
|
||||
index = ((int) (read >>> 16) + inProgressCnt) & mask;
|
||||
|
||||
if (index == (int) ((long) WRITE.getAcquire(this) >>> 16)) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
|
||||
newRead = read + 1;
|
||||
if (READ.weakCompareAndSetAcquire(this, read, newRead)) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
read = (long) READ.getVolatile(this);
|
||||
}
|
||||
if (!success) {
|
||||
return null;
|
||||
}
|
||||
final T result = buffer[index];
|
||||
buffer[index] = null;
|
||||
if ((newRead & FAST_PATH_MASK) == ((long) index << 16) && index < mask) {
|
||||
READ.getAndAddRelease(this, (1L << 16) - 1);
|
||||
} else {
|
||||
read = newRead;
|
||||
while (true) {
|
||||
final int inProcessCnt = (int) (read & PENDING_MASK);
|
||||
final long n;
|
||||
if (((int) ((read & DONE_MASK) >>> 8) + 1) == inProcessCnt) {
|
||||
n = ((long) (((int) (read >>> 16) + inProcessCnt) & mask)) << 16;
|
||||
} else if ((int) (read >>> 16) == index) {
|
||||
n = (read + (1L << 16) - 1) & (((long) mask << 16) | 0xFFFFL);
|
||||
} else {
|
||||
n = read + (1L << 8);
|
||||
}
|
||||
|
||||
if (READ.weakCompareAndSetRelease(this, read, n)) {
|
||||
break;
|
||||
}
|
||||
read = (long) READ.getVolatile(this);
|
||||
spinWait(attempts++);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public int length() {
|
||||
final long readCounters = (long) READ.getVolatile(this);
|
||||
final long writeCounters = (long) WRITE.getVolatile(this);
|
||||
final int readIndex = (int) (readCounters >>> 16);
|
||||
final int writeIndex = (int) (writeCounters >>> 16);
|
||||
return (readIndex <= writeIndex ?
|
||||
writeIndex - readIndex :
|
||||
writeIndex + capacity() - readIndex) - (int) (readCounters & PENDING_MASK);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return length() == 0;
|
||||
}
|
||||
|
||||
public int capacity() {
|
||||
return buffer.length;
|
||||
}
|
||||
|
||||
public int remaining() {
|
||||
final long readCounters = (long) READ.getVolatile(this);
|
||||
final long writeCounters = (long) WRITE.getVolatile(this);
|
||||
final int cap = capacity();
|
||||
final int readIndex = (int) (readCounters >>> 16);
|
||||
final int writeIndex = (int) (writeCounters >>> 16);
|
||||
final int len = readIndex <= writeIndex ?
|
||||
writeIndex - readIndex :
|
||||
writeIndex + cap - readIndex;
|
||||
return cap - 1 - len - (int) (writeCounters & PENDING_MASK);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private final static class Padded {
|
||||
private byte i0, i1, i2, i3, i4, i5, i6, i7, i8, i9, i10, i11, i12, i13, i14, i15;
|
||||
private byte j0, j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12, j13, j14, j15;
|
||||
private byte k0, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, k13, k14, k15;
|
||||
private byte l0, l1, l2, l3, l4, l5, l6, l7, l8, l9, l10, l11, l12, l13, l14, l15;
|
||||
private byte m0, m1, m2, m3, m4, m5, m6, m7, m8, m9, m10, m11, m12, m13, m14, m15;
|
||||
private byte n0, n1, n2, n3, n4, n5, n6, n7, n8, n9, n10, n11, n12, n13, n14, n15;
|
||||
private byte o0, o1, o2, o3, o4, o5, o6, o7, o8, o9, o10, o11, o12, o13, o14, o15;
|
||||
private byte p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user