diff --git a/sources/src/main/java/co/aikar/timings/TimingData.java b/sources/src/main/java/co/aikar/timings/TimingData.java new file mode 100644 index 000000000..6178328f2 --- /dev/null +++ b/sources/src/main/java/co/aikar/timings/TimingData.java @@ -0,0 +1,127 @@ +/* + * This file is licensed under the MIT License (MIT). + * + * Copyright (c) 2014 Daniel Ennis + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package co.aikar.timings; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; + +import static co.aikar.util.JSONUtil.toArray; + +/** + * Akarin Changes Note + * 1) Thread safe timing (safety) + */ +/** + *

Lightweight object for tracking timing data

+ * + * This is broken out to reduce memory usage + */ +class TimingData { + private final int id; + private int count = 0; + private int lagCount = 0; + private long totalTime = 0; + private long lagTotalTime = 0; + private AtomicInteger curTickCount = new AtomicInteger(); // Akarin + private LongAdder curTickTotal = new LongAdder(); // Akarin + + TimingData(int id) { + this.id = id; + } + + private TimingData(TimingData data) { + this.id = data.id; + this.totalTime = data.totalTime; + this.lagTotalTime = data.lagTotalTime; + this.count = data.count; + this.lagCount = data.lagCount; + } + + void add(long diff) { + curTickCount.incrementAndGet(); + curTickTotal.add(diff); + } + + void processTick(boolean violated) { + totalTime += curTickTotal.sum(); + count += curTickCount.get(); + if (violated) { + lagTotalTime += curTickTotal.sum(); + lagCount += curTickCount.get(); + } + curTickTotal.reset(); + curTickCount.set(0); + } + + void reset() { + count = 0; + lagCount = 0; + curTickTotal.reset(); + curTickCount.set(0); + totalTime = 0; + lagTotalTime = 0; + } + + protected TimingData clone() { + return new TimingData(this); + } + + List export() { + List list = toArray( + id, + count, + totalTime); + if (lagCount > 0) { + list.add(lagCount); + list.add(lagTotalTime); + } + return list; + } + + boolean hasData() { + return count > 0; + } + + long getTotalTime() { + return totalTime; + } + + int getCurTickCount() { + return curTickCount.get(); + } + + void setCurTickCount(int curTickCount) { + this.curTickCount.getAndSet(curTickCount); + } + + long getCurTickTotal() { + return curTickTotal.sum(); + } + + void setCurTickTotal(long curTickTotal) { + this.curTickTotal.reset(); + this.curTickTotal.add(curTickTotal); + } +} diff --git a/sources/src/main/java/co/aikar/timings/TimingHandler.java b/sources/src/main/java/co/aikar/timings/TimingHandler.java index 964322ead..ce5799278 100644 --- a/sources/src/main/java/co/aikar/timings/TimingHandler.java +++ b/sources/src/main/java/co/aikar/timings/TimingHandler.java @@ -24,17 +24,20 @@ package co.aikar.timings; import co.aikar.util.LoadingIntMap; +import io.akarin.api.internal.Akari; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + import org.bukkit.Bukkit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; /** * Akarin Changes Note - * 1) Make class public (compatibility) - * 2) Add stopTiming method that accept given start time (compatibility) + * 1) Thread safe timing (safety) */ -public class TimingHandler implements Timing { // Akarin +class TimingHandler implements Timing { private static int idPool = 1; final int id = idPool++; @@ -47,12 +50,12 @@ public class TimingHandler implements Timing { // Akarin final TimingData record; private final TimingHandler groupHandler; - private long start = 0; - private int timingDepth = 0; - private boolean added; + private AtomicLong start = new AtomicLong(); // Akarin + private AtomicInteger timingDepth = new AtomicInteger(); // Akarin + private volatile boolean added; // Akarin private boolean timed; private boolean enabled; - private TimingHandler parent; + private volatile TimingHandler parent; // Akarin TimingHandler(TimingIdentifier id) { if (id.name.startsWith("##")) { @@ -75,16 +78,18 @@ public class TimingHandler implements Timing { // Akarin } void processTick(boolean violated) { - if (timingDepth != 0 || record.getCurTickCount() == 0) { - timingDepth = 0; - start = 0; + if (timingDepth.get() != 0 || record.getCurTickCount() == 0) { + timingDepth.set(0); + start.set(0); return; } record.processTick(violated); + Akari.timingsLock.lock(); // Akarin for (TimingData handler : children.values()) { handler.processTick(violated); } + Akari.timingsLock.unlock(); // Akarin } @Override @@ -102,42 +107,32 @@ public class TimingHandler implements Timing { // Akarin } } - @Override public Timing startTiming() { - if (enabled && ++timingDepth == 1) { - start = System.nanoTime(); + if (enabled && timingDepth.incrementAndGet() == 1) { + start.getAndSet(System.nanoTime()); parent = TimingsManager.CURRENT; TimingsManager.CURRENT = this; } return this; } - @Override public void stopTiming() { - if (enabled && --timingDepth == 0 && start != 0) { + if (enabled && timingDepth.decrementAndGet() == 0 && start.get() != 0) { if (!Bukkit.isPrimaryThread()) { Bukkit.getLogger().log(Level.SEVERE, "stopTiming called async for " + name); new Throwable().printStackTrace(); - start = 0; + start.getAndSet(0); return; } - addDiff(System.nanoTime() - start); - start = 0; + long prev = start.getAndSet(0); // Akarin + addDiff(System.nanoTime() - prev); // Akarin } } - - // Akarin start - public void stopTiming(long start) { - if (enabled && --timingDepth == 0 && start != 0) { - addDiff(System.nanoTime() - start); - } - } - // Akarin end @Override public void abort() { - if (enabled && timingDepth > 0) { - start = 0; + if (enabled && timingDepth.get() > 0) { + start.getAndSet(0); } } @@ -145,18 +140,24 @@ public class TimingHandler implements Timing { // Akarin if (TimingsManager.CURRENT == this) { TimingsManager.CURRENT = parent; if (parent != null) { + Akari.timingsLock.lock(); // Akarin parent.children.get(id).add(diff); + Akari.timingsLock.unlock(); // Akarin } } record.add(diff); if (!added) { added = true; timed = true; + Akari.timingsLock.lock(); // Akarin TimingsManager.HANDLERS.add(this); + Akari.timingsLock.unlock(); // Akarin } if (groupHandler != null) { groupHandler.addDiff(diff); + Akari.timingsLock.lock(); // Akarin groupHandler.children.get(id).add(diff); + Akari.timingsLock.unlock(); // Akarin } } @@ -170,10 +171,12 @@ public class TimingHandler implements Timing { // Akarin if (full) { timed = false; } - start = 0; - timingDepth = 0; + start.set(0); + timingDepth.set(0); added = false; + Akari.timingsLock.lock(); // Akarin children.clear(); + Akari.timingsLock.unlock(); // Akarin checkEnabled(); } @@ -214,11 +217,13 @@ public class TimingHandler implements Timing { // Akarin } TimingData[] cloneChildren() { - final TimingData[] clonedChildren = new TimingData[children.size()]; int i = 0; + Akari.timingsLock.lock(); // Akarin + final TimingData[] clonedChildren = new TimingData[children.size()]; for (TimingData child : children.values()) { clonedChildren[i++] = child.clone(); } + Akari.timingsLock.unlock(); // Akarin return clonedChildren; } } diff --git a/sources/src/main/java/co/aikar/timings/TimingHistory.java b/sources/src/main/java/co/aikar/timings/TimingHistory.java new file mode 100644 index 000000000..a5c5922de --- /dev/null +++ b/sources/src/main/java/co/aikar/timings/TimingHistory.java @@ -0,0 +1,355 @@ +/* + * This file is licensed under the MIT License (MIT). + * + * Copyright (c) 2014 Daniel Ennis + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package co.aikar.timings; + +import co.aikar.timings.TimingHistory.RegionData.RegionId; +import co.aikar.util.JSONUtil; +import com.google.common.base.Function; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.bukkit.Bukkit; +import org.bukkit.Chunk; +import org.bukkit.Material; +import org.bukkit.World; +import org.bukkit.block.BlockState; +import org.bukkit.entity.Entity; +import org.bukkit.entity.EntityType; +import org.bukkit.entity.Player; +import co.aikar.util.LoadingMap; +import co.aikar.util.MRUMapCache; +import io.akarin.api.internal.Akari; + +import java.lang.management.ManagementFactory; +import java.util.Collection; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static co.aikar.timings.TimingsManager.FULL_SERVER_TICK; +import static co.aikar.timings.TimingsManager.MINUTE_REPORTS; +import static co.aikar.util.JSONUtil.*; + +@SuppressWarnings({"deprecation", "SuppressionAnnotation", "Convert2Lambda", "Anonymous2MethodRef"}) +public class TimingHistory { + public static long lastMinuteTime; + public static long timedTicks; + public static long playerTicks; + public static long entityTicks; + public static long tileEntityTicks; + public static long activatedEntityTicks; + private static int worldIdPool = 1; + static Map worldMap = LoadingMap.newHashMap(new Function() { + @Override + public Integer apply(String input) { + return worldIdPool++; + } + }); + private final long endTime; + private final long startTime; + private final long totalTicks; + private final long totalTime; // Represents all time spent running the server this history + private final MinuteReport[] minuteReports; + + private final TimingHistoryEntry[] entries; + final Set tileEntityTypeSet = Sets.newHashSet(); + final Set entityTypeSet = Sets.newHashSet(); + private final Map worlds; + + TimingHistory() { + this.endTime = System.currentTimeMillis() / 1000; + this.startTime = TimingsManager.historyStart / 1000; + if (timedTicks % 1200 != 0 || MINUTE_REPORTS.isEmpty()) { + this.minuteReports = MINUTE_REPORTS.toArray(new MinuteReport[MINUTE_REPORTS.size() + 1]); + this.minuteReports[this.minuteReports.length - 1] = new MinuteReport(); + } else { + this.minuteReports = MINUTE_REPORTS.toArray(new MinuteReport[MINUTE_REPORTS.size()]); + } + long ticks = 0; + for (MinuteReport mp : this.minuteReports) { + ticks += mp.ticksRecord.timed; + } + this.totalTicks = ticks; + this.totalTime = FULL_SERVER_TICK.record.getTotalTime(); + Akari.timingsLock.lock(); // Akarin + this.entries = new TimingHistoryEntry[TimingsManager.HANDLERS.size()]; + + int i = 0; + for (TimingHandler handler : TimingsManager.HANDLERS) { + entries[i++] = new TimingHistoryEntry(handler); + } + Akari.timingsLock.unlock(); // Akarin + + + // Information about all loaded chunks/entities + //noinspection unchecked + this.worlds = toObjectMapper(Bukkit.getWorlds(), new Function() { + @Override + public JSONPair apply(World world) { + Map regions = LoadingMap.newHashMap(RegionData.LOADER); + + for (Chunk chunk : world.getLoadedChunks()) { + RegionData data = regions.get(new RegionId(chunk.getX(), chunk.getZ())); + + for (Entity entity : chunk.getEntities()) { + if (entity == null) { + Bukkit.getLogger().warning("Null entity detected in chunk at position x: " + chunk.getX() + ", z: " + chunk.getZ()); + continue; + } + + data.entityCounts.get(entity.getType()).increment(); + } + + for (BlockState tileEntity : chunk.getTileEntities()) { + if (tileEntity == null) { + Bukkit.getLogger().warning("Null tileentity detected in chunk at position x: " + chunk.getX() + ", z: " + chunk.getZ()); + continue; + } + + data.tileEntityCounts.get(tileEntity.getBlock().getType()).increment(); + } + } + return pair( + worldMap.get(world.getName()), + toArrayMapper(regions.values(),new Function() { + @Override + public Object apply(RegionData input) { + return toArray( + input.regionId.x, + input.regionId.z, + toObjectMapper(input.entityCounts.entrySet(), + new Function, JSONPair>() { + @Override + public JSONPair apply(Map.Entry entry) { + entityTypeSet.add(entry.getKey()); + return pair( + String.valueOf(entry.getKey().getTypeId()), + entry.getValue().count() + ); + } + } + ), + toObjectMapper(input.tileEntityCounts.entrySet(), + new Function, JSONPair>() { + @Override + public JSONPair apply(Map.Entry entry) { + tileEntityTypeSet.add(entry.getKey()); + return pair( + String.valueOf(entry.getKey().getId()), + entry.getValue().count() + ); + } + } + ) + ); + } + }) + ); + } + }); + } + static class RegionData { + final RegionId regionId; + @SuppressWarnings("Guava") + static Function LOADER = new Function() { + @Override + public RegionData apply(RegionId id) { + return new RegionData(id); + } + }; + RegionData(RegionId id) { + this.regionId = id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RegionData that = (RegionData) o; + + return regionId.equals(that.regionId); + + } + + @Override + public int hashCode() { + return regionId.hashCode(); + } + + @SuppressWarnings("unchecked") + final Map entityCounts = MRUMapCache.of(LoadingMap.of( + new EnumMap(EntityType.class), Counter.LOADER + )); + @SuppressWarnings("unchecked") + final Map tileEntityCounts = MRUMapCache.of(LoadingMap.of( + new EnumMap(Material.class), Counter.LOADER + )); + + static class RegionId { + final int x, z; + final long regionId; + RegionId(int x, int z) { + this.x = x >> 5 << 5; + this.z = z >> 5 << 5; + this.regionId = ((long) (this.x) << 32) + (this.z >> 5 << 5) - Integer.MIN_VALUE; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RegionId regionId1 = (RegionId) o; + + return regionId == regionId1.regionId; + + } + + @Override + public int hashCode() { + return (int) (regionId ^ (regionId >>> 32)); + } + } + } + static void resetTicks(boolean fullReset) { + if (fullReset) { + // Non full is simply for 1 minute reports + timedTicks = 0; + } + lastMinuteTime = System.nanoTime(); + playerTicks = 0; + tileEntityTicks = 0; + entityTicks = 0; + activatedEntityTicks = 0; + } + + Object export() { + return createObject( + pair("s", startTime), + pair("e", endTime), + pair("tk", totalTicks), + pair("tm", totalTime), + pair("w", worlds), + pair("h", toArrayMapper(entries, new Function() { + @Override + public Object apply(TimingHistoryEntry entry) { + TimingData record = entry.data; + if (!record.hasData()) { + return null; + } + return entry.export(); + } + })), + pair("mp", toArrayMapper(minuteReports, new Function() { + @Override + public Object apply(MinuteReport input) { + return input.export(); + } + })) + ); + } + + static class MinuteReport { + final long time = System.currentTimeMillis() / 1000; + + final TicksRecord ticksRecord = new TicksRecord(); + final PingRecord pingRecord = new PingRecord(); + final TimingData fst = TimingsManager.FULL_SERVER_TICK.minuteData.clone(); + final double tps = 1E9 / ( System.nanoTime() - lastMinuteTime ) * ticksRecord.timed; + final double usedMemory = TimingsManager.FULL_SERVER_TICK.avgUsedMemory; + final double freeMemory = TimingsManager.FULL_SERVER_TICK.avgFreeMemory; + final double loadAvg = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); + + List export() { + return toArray( + time, + Math.round(tps * 100D) / 100D, + Math.round(pingRecord.avg * 100D) / 100D, + fst.export(), + toArray(ticksRecord.timed, + ticksRecord.player, + ticksRecord.entity, + ticksRecord.activatedEntity, + ticksRecord.tileEntity + ), + usedMemory, + freeMemory, + loadAvg + ); + } + } + + private static class TicksRecord { + final long timed; + final long player; + final long entity; + final long tileEntity; + final long activatedEntity; + + TicksRecord() { + timed = timedTicks - (TimingsManager.MINUTE_REPORTS.size() * 1200); + player = playerTicks; + entity = entityTicks; + tileEntity = tileEntityTicks; + activatedEntity = activatedEntityTicks; + } + + } + + private static class PingRecord { + final double avg; + + PingRecord() { + final Collection onlinePlayers = Bukkit.getOnlinePlayers(); + int totalPing = 0; + for (Player player : onlinePlayers) { + totalPing += player.spigot().getPing(); + } + avg = onlinePlayers.isEmpty() ? 0 : totalPing / onlinePlayers.size(); + } + } + + + private static class Counter { + private int count = 0; + @SuppressWarnings({"rawtypes", "SuppressionAnnotation", "Guava"}) + static Function LOADER = new LoadingMap.Feeder() { + @Override + public Counter apply() { + return new Counter(); + } + }; + public int increment() { + return ++count; + } + public int count() { + return count; + } + } +} diff --git a/sources/src/main/java/co/aikar/timings/TimingsManager.java b/sources/src/main/java/co/aikar/timings/TimingsManager.java new file mode 100644 index 000000000..b1349ba8d --- /dev/null +++ b/sources/src/main/java/co/aikar/timings/TimingsManager.java @@ -0,0 +1,207 @@ +/* + * This file is licensed under the MIT License (MIT). + * + * Copyright (c) 2014 Daniel Ennis + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package co.aikar.timings; + +import com.google.common.base.Function; +import com.google.common.collect.EvictingQueue; +import org.bukkit.Bukkit; +import org.bukkit.Server; +import org.bukkit.command.Command; +import org.bukkit.plugin.Plugin; +import org.bukkit.plugin.java.PluginClassLoader; +import co.aikar.util.LoadingMap; +import io.akarin.api.internal.Akari; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; + +/** + * Akarin Changes Note + * 1) Thread safe timing (safety) + */ +public final class TimingsManager { + static final Map TIMING_MAP = + Collections.synchronizedMap(LoadingMap.newHashMap( + new Function() { + @Override + public TimingHandler apply(TimingIdentifier id) { + return (id.protect ? + new UnsafeTimingHandler(id) : + new TimingHandler(id) + ); + } + }, + 256, .5F + )); + public static final FullServerTickHandler FULL_SERVER_TICK = new FullServerTickHandler(); + public static final TimingHandler TIMINGS_TICK = Timings.ofSafe("Timings Tick", FULL_SERVER_TICK); + public static final Timing PLUGIN_GROUP_HANDLER = Timings.ofSafe("Plugins"); + public static List hiddenConfigs = new ArrayList(); + public static boolean privacy = false; + + static final Collection HANDLERS = new ArrayDeque(); + static final ArrayDeque MINUTE_REPORTS = new ArrayDeque(); + + static EvictingQueue HISTORY = EvictingQueue.create(12); + static TimingHandler CURRENT; + static long timingStart = 0; + static long historyStart = 0; + static boolean needsFullReset = false; + static boolean needsRecheckEnabled = false; + + private TimingsManager() {} + + /** + * Resets all timing data on the next tick + */ + static void reset() { + needsFullReset = true; + } + + /** + * Ticked every tick by CraftBukkit to count the number of times a timer + * caused TPS loss. + */ + static void tick() { + if (Timings.timingsEnabled) { + boolean violated = FULL_SERVER_TICK.isViolated(); + + Akari.timingsLock.lock(); // Akarin + for (TimingHandler handler : HANDLERS) { + if (handler.isSpecial()) { + // We manually call this + continue; + } + handler.processTick(violated); + } + Akari.timingsLock.unlock(); // Akarin + + TimingHistory.playerTicks += Bukkit.getOnlinePlayers().size(); + TimingHistory.timedTicks++; + // Generate TPS/Ping/Tick reports every minute + } + } + static void stopServer() { + Timings.timingsEnabled = false; + recheckEnabled(); + } + static void recheckEnabled() { + synchronized (TIMING_MAP) { + for (TimingHandler timings : TIMING_MAP.values()) { + timings.checkEnabled(); + } + } + needsRecheckEnabled = false; + } + static void resetTimings() { + if (needsFullReset) { + // Full resets need to re-check every handlers enabled state + // Timing map can be modified from async so we must sync on it. + synchronized (TIMING_MAP) { + for (TimingHandler timings : TIMING_MAP.values()) { + timings.reset(true); + } + } + Bukkit.getLogger().log(Level.INFO, "Timings Reset"); + HISTORY.clear(); + needsFullReset = false; + needsRecheckEnabled = false; + timingStart = System.currentTimeMillis(); + } else { + // Soft resets only need to act on timings that have done something + // Handlers can only be modified on main thread. + Akari.timingsLock.lock(); // Akarin + for (TimingHandler timings : HANDLERS) { + timings.reset(false); + } + Akari.timingsLock.unlock(); // Akarin + } + + Akari.timingsLock.lock(); // Akarin + HANDLERS.clear(); + Akari.timingsLock.unlock(); // Akarin + MINUTE_REPORTS.clear(); + + TimingHistory.resetTicks(true); + historyStart = System.currentTimeMillis(); + } + + static TimingHandler getHandler(String group, String name, Timing parent, boolean protect) { + return TIMING_MAP.get(new TimingIdentifier(group, name, parent, protect)); + } + + + /** + *

Due to access restrictions, we need a helper method to get a Command TimingHandler with String group

+ * + * Plugins should never call this + * + * @param pluginName Plugin this command is associated with + * @param command Command to get timings for + * @return TimingHandler + */ + public static Timing getCommandTiming(String pluginName, Command command) { + Plugin plugin = null; + final Server server = Bukkit.getServer(); + if (!( server == null || pluginName == null || + "minecraft".equals(pluginName) || "bukkit".equals(pluginName) || + "spigot".equalsIgnoreCase(pluginName) || "paper".equals(pluginName) + )) { + plugin = server.getPluginManager().getPlugin(pluginName); + } + if (plugin == null) { + // Plugin is passing custom fallback prefix, try to look up by class loader + plugin = getPluginByClassloader(command.getClass()); + } + if (plugin == null) { + return Timings.ofSafe("Command: " + pluginName + ":" + command.getTimingName()); + } + + return Timings.ofSafe(plugin, "Command: " + pluginName + ":" + command.getTimingName()); + } + + /** + * Looks up the class loader for the specified class, and if it is a PluginClassLoader, return the + * Plugin that created this class. + * + * @param clazz Class to check + * @return Plugin if created by a plugin + */ + public static Plugin getPluginByClassloader(Class clazz) { + if (clazz == null) { + return null; + } + final ClassLoader classLoader = clazz.getClassLoader(); + if (classLoader instanceof PluginClassLoader) { + PluginClassLoader pluginClassLoader = (PluginClassLoader) classLoader; + return pluginClassLoader.getPlugin(); + } + return null; + } +} diff --git a/sources/src/main/java/io/akarin/api/internal/Akari.java b/sources/src/main/java/io/akarin/api/internal/Akari.java index 7845ba1ea..9f796e959 100644 --- a/sources/src/main/java/io/akarin/api/internal/Akari.java +++ b/sources/src/main/java/io/akarin/api/internal/Akari.java @@ -4,7 +4,9 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Queue; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -14,8 +16,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import co.aikar.timings.Timing; import co.aikar.timings.Timings; +import io.akarin.api.internal.Akari.AssignableFactory; +import io.akarin.api.internal.Akari.TimingSignal; +import io.akarin.api.internal.utils.ReentrantSpinningLock; +import io.akarin.api.internal.utils.thread.SuspendableExecutorCompletionService; +import io.akarin.api.internal.utils.thread.SuspendableThreadPoolExecutor; import io.akarin.server.core.AkarinGlobalConfig; import net.minecraft.server.MinecraftServer; +import net.minecraft.server.World; import net.minecraft.server.WorldServer; @SuppressWarnings("restriction") @@ -61,8 +69,43 @@ public abstract class Akari { } } - public static ExecutorCompletionService STAGE_ENTITY_TICK; - public static ExecutorCompletionService STAGE_WORLD_TICK; + public static class TimingSignal { + public final World tickedWorld; + public final boolean isEntities; + + public TimingSignal(World world, boolean entities) { + tickedWorld = world; + isEntities = entities; + } + } + + public static SuspendableExecutorCompletionService STAGE_TICK; + + static { + resizeTickExecutors(3); + } + + public static void resizeTickExecutors(int worlds) { + int parallelism; + switch (AkarinGlobalConfig.parallelMode) { + case -1: + return; + case 0: + parallelism = 2; + break; + case 1: + parallelism = worlds + 1; + break; + case 2: + default: + parallelism = worlds * 2; + break; + } + STAGE_TICK = new SuspendableExecutorCompletionService<>(new SuspendableThreadPoolExecutor(parallelism, parallelism, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new AssignableFactory("Akarin Parallel Ticking Thread - $"))); + } public static boolean isPrimaryThread() { return isPrimaryThread(true); @@ -97,6 +140,8 @@ public abstract class Akari { return serverVersion + " (MC: " + MinecraftServer.getServer().getVersion() + ")"; } + public static final ReentrantSpinningLock timingsLock = new ReentrantSpinningLock(); + /* * Timings */ diff --git a/sources/src/main/java/io/akarin/api/internal/utils/ReentrantSpinningLock.java b/sources/src/main/java/io/akarin/api/internal/utils/ReentrantSpinningLock.java new file mode 100644 index 000000000..e05c9f792 --- /dev/null +++ b/sources/src/main/java/io/akarin/api/internal/utils/ReentrantSpinningLock.java @@ -0,0 +1,98 @@ +package io.akarin.api.internal.utils; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class ReentrantSpinningLock { + /* + * Impl Note: + * A write lock can reentrant as a read lock, while a + * read lock is not allowed to reentrant as a write lock. + */ + private final AtomicBoolean writeLocked = new AtomicBoolean(false); + + // --------- Thread local restricted fields --------- + private long heldThreadId = 0; + private int reentrantLocks = 0; + + /** + * Lock as a typical write lock + */ + public void lock() { + long currentThreadId = Thread.currentThread().getId(); + if (heldThreadId == currentThreadId) { + reentrantLocks++; + } else { + while (!writeLocked.compareAndSet(false, true)) ; // In case acquire one lock concurrently + heldThreadId = currentThreadId; + } + } + + public void unlock() { + if (reentrantLocks == 0) { + heldThreadId = 0; + if (readerThreads.get() == 0 || readerThreads.getAndDecrement() == 1) { // Micro-optimization: this saves one subtract + writeLocked.set(false); + } + } else { + --reentrantLocks; + } + } + + private final AtomicInteger readerThreads = new AtomicInteger(0); + + /** + * Lock as a typical read lock + */ + public void lockWeak() { + long currentThreadId = Thread.currentThread().getId(); + if (heldThreadId == currentThreadId) { + reentrantLocks++; + } else { + if (readerThreads.get() == 0) { + while (!writeLocked.compareAndSet(false, true)) ; // Block future write lock + } + heldThreadId = currentThreadId; + readerThreads.getAndIncrement(); // Micro-optimization: this saves one plus + } + } + + public void unlockWeak() { + if (reentrantLocks == 0) { + heldThreadId = 0; + writeLocked.set(false); + } else { + --reentrantLocks; + } + } + + // --------- Wrappers to allow typical usages --------- + private SpinningWriteLock wrappedWriteLock = new SpinningWriteLock(); + private SpinningReadLock wrappedReadLock = new SpinningReadLock(); + + public class SpinningWriteLock { + public void lock() { + lock(); + } + public void unlock() { + unlock(); + } + } + + public class SpinningReadLock { + public void lock() { + lockWeak(); + } + public void unlock() { + unlockWeak(); + } + } + + public SpinningWriteLock writeLock() { + return wrappedWriteLock; + } + + public SpinningReadLock readLocked() { + return wrappedReadLock; + } +} diff --git a/sources/src/main/java/io/akarin/api/internal/utils/thread/OpenExecutionException.java b/sources/src/main/java/io/akarin/api/internal/utils/thread/OpenExecutionException.java new file mode 100644 index 000000000..215952436 --- /dev/null +++ b/sources/src/main/java/io/akarin/api/internal/utils/thread/OpenExecutionException.java @@ -0,0 +1,7 @@ +package io.akarin.api.internal.utils.thread; + +import java.util.concurrent.ExecutionException; + +public class OpenExecutionException extends ExecutionException { + private static final long serialVersionUID = 7830266012832686185L; +} diff --git a/sources/src/main/java/io/akarin/api/internal/utils/thread/SuspendableExecutorCompletionService.java b/sources/src/main/java/io/akarin/api/internal/utils/thread/SuspendableExecutorCompletionService.java new file mode 100644 index 000000000..6204d15c8 --- /dev/null +++ b/sources/src/main/java/io/akarin/api/internal/utils/thread/SuspendableExecutorCompletionService.java @@ -0,0 +1,142 @@ +/* + * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ + +/* + * + * + * + * + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package io.akarin.api.internal.utils.thread; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +public class SuspendableExecutorCompletionService implements CompletionService { + private final SuspendableThreadPoolExecutor executor; + private final BlockingQueue> completionQueue; + + public void suspend() { + executor.suspend(); + } + + public void resume() { + executor.resume(); + } + + /** + * FutureTask extension to enqueue upon completion + */ + private class QueueingFuture extends FutureTask { + QueueingFuture(RunnableFuture task) { + super(task, null); + this.task = task; + } + protected void done() { completionQueue.add(task); } + private final Future task; + } + + private RunnableFuture newTaskFor(Callable task) { + return new FutureTask(task); + } + + private RunnableFuture newTaskFor(Runnable task, V result) { + return new FutureTask(task, result); + } + + /** + * Creates an ExecutorCompletionService using the supplied + * executor for base task execution and a + * {@link LinkedBlockingQueue} as a completion queue. + * + * @param executor the executor to use + * @throws NullPointerException if executor is {@code null} + */ + public SuspendableExecutorCompletionService(SuspendableThreadPoolExecutor executor) { + if (executor == null) + throw new NullPointerException(); + this.executor = executor; + this.completionQueue = new LinkedBlockingQueue>(); + } + + /** + * Creates an ExecutorCompletionService using the supplied + * executor for base task execution and the supplied queue as its + * completion queue. + * + * @param executor the executor to use + * @param completionQueue the queue to use as the completion queue + * normally one dedicated for use by this service. This + * queue is treated as unbounded -- failed attempted + * {@code Queue.add} operations for completed tasks cause + * them not to be retrievable. + * @throws NullPointerException if executor or completionQueue are {@code null} + */ + public SuspendableExecutorCompletionService(SuspendableThreadPoolExecutor executor, + BlockingQueue> completionQueue) { + if (executor == null || completionQueue == null) + throw new NullPointerException(); + this.executor = executor; + this.completionQueue = completionQueue; + } + + public Future submit(Callable task) { + if (task == null) throw new NullPointerException(); + RunnableFuture f = newTaskFor(task); + executor.execute(new QueueingFuture(f)); + return f; + } + + public Future submit(Runnable task, V result) { + if (task == null) throw new NullPointerException(); + RunnableFuture f = newTaskFor(task, result); + executor.execute(new QueueingFuture(f)); + return f; + } + + public Future take() throws InterruptedException { + return completionQueue.take(); + } + + public Future poll() { + return completionQueue.poll(); + } + + public Future poll(long timeout, TimeUnit unit) + throws InterruptedException { + return completionQueue.poll(timeout, unit); + } + +} diff --git a/sources/src/main/java/io/akarin/api/internal/utils/thread/SuspendableThreadPoolExecutor.java b/sources/src/main/java/io/akarin/api/internal/utils/thread/SuspendableThreadPoolExecutor.java new file mode 100644 index 000000000..317562963 --- /dev/null +++ b/sources/src/main/java/io/akarin/api/internal/utils/thread/SuspendableThreadPoolExecutor.java @@ -0,0 +1,1842 @@ +/* + * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ + +/* + * + * + * + * + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package io.akarin.api.internal.utils.thread; + +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.*; + +public class SuspendableThreadPoolExecutor extends ThreadPoolExecutor { + /** + * The main pool control state, ctl, is an atomic integer packing + * two conceptual fields + * workerCount, indicating the effective number of threads + * runState, indicating whether running, shutting down etc + * + * In order to pack them into one int, we limit workerCount to + * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 + * billion) otherwise representable. If this is ever an issue in + * the future, the variable can be changed to be an AtomicLong, + * and the shift/mask constants below adjusted. But until the need + * arises, this code is a bit faster and simpler using an int. + * + * The workerCount is the number of workers that have been + * permitted to start and not permitted to stop. The value may be + * transiently different from the actual number of live threads, + * for example when a ThreadFactory fails to create a thread when + * asked, and when exiting threads are still performing + * bookkeeping before terminating. The user-visible pool size is + * reported as the current size of the workers set. + * + * The runState provides the main lifecycle control, taking on values: + * + * RUNNING: Accept new tasks and process queued tasks + * SHUTDOWN: Don't accept new tasks, but process queued tasks + * STOP: Don't accept new tasks, don't process queued tasks, + * and interrupt in-progress tasks + * TIDYING: All tasks have terminated, workerCount is zero, + * the thread transitioning to state TIDYING + * will run the terminated() hook method + * TERMINATED: terminated() has completed + * + * The numerical order among these values matters, to allow + * ordered comparisons. The runState monotonically increases over + * time, but need not hit each state. The transitions are: + * + * RUNNING -> SHUTDOWN + * On invocation of shutdown(), perhaps implicitly in finalize() + * (RUNNING or SHUTDOWN) -> STOP + * On invocation of shutdownNow() + * SHUTDOWN -> TIDYING + * When both queue and pool are empty + * STOP -> TIDYING + * When pool is empty + * TIDYING -> TERMINATED + * When the terminated() hook method has completed + * + * Threads waiting in awaitTermination() will return when the + * state reaches TERMINATED. + * + * Detecting the transition from SHUTDOWN to TIDYING is less + * straightforward than you'd like because the queue may become + * empty after non-empty and vice versa during SHUTDOWN state, but + * we can only terminate if, after seeing that it is empty, we see + * that workerCount is 0 (which sometimes entails a recheck -- see + * below). + */ + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); + private static final int COUNT_BITS = Integer.SIZE - 3; + private static final int CAPACITY = (1 << COUNT_BITS) - 1; + + // runState is stored in the high-order bits + private static final int RUNNING = -1 << COUNT_BITS; + private static final int SHUTDOWN = 0 << COUNT_BITS; + private static final int STOP = 1 << COUNT_BITS; + private static final int TIDYING = 2 << COUNT_BITS; + private static final int TERMINATED = 3 << COUNT_BITS; + + // Packing and unpacking ctl + private static int runStateOf(int c) { return c & ~CAPACITY; } + private static int workerCountOf(int c) { return c & CAPACITY; } + private static int ctlOf(int rs, int wc) { return rs | wc; } + + public void suspend() { + Thread curThread = Thread.currentThread(); + for (Worker worker : workers) if (worker.thread != curThread) worker.thread.suspend(); + } + + public void resume() { + Thread curThread = Thread.currentThread(); + for (Worker worker : workers) if (worker.thread != curThread) worker.thread.resume(); + } + + /* + * Bit field accessors that don't require unpacking ctl. + * These depend on the bit layout and on workerCount being never negative. + */ + + private static boolean runStateLessThan(int c, int s) { + return c < s; + } + + private static boolean runStateAtLeast(int c, int s) { + return c >= s; + } + + private static boolean isRunning(int c) { + return c < SHUTDOWN; + } + + /** + * Attempts to CAS-increment the workerCount field of ctl. + */ + private boolean compareAndIncrementWorkerCount(int expect) { + return ctl.compareAndSet(expect, expect + 1); + } + + /** + * Attempts to CAS-decrement the workerCount field of ctl. + */ + private boolean compareAndDecrementWorkerCount(int expect) { + return ctl.compareAndSet(expect, expect - 1); + } + + /** + * Decrements the workerCount field of ctl. This is called only on + * abrupt termination of a thread (see processWorkerExit). Other + * decrements are performed within getTask. + */ + private void decrementWorkerCount() { + do {} while (! compareAndDecrementWorkerCount(ctl.get())); + } + + /** + * The queue used for holding tasks and handing off to worker + * threads. We do not require that workQueue.poll() returning + * null necessarily means that workQueue.isEmpty(), so rely + * solely on isEmpty to see if the queue is empty (which we must + * do for example when deciding whether to transition from + * SHUTDOWN to TIDYING). This accommodates special-purpose + * queues such as DelayQueues for which poll() is allowed to + * return null even if it may later return non-null when delays + * expire. + */ + private final BlockingQueue workQueue; + + /** + * Lock held on access to workers set and related bookkeeping. + * While we could use a concurrent set of some sort, it turns out + * to be generally preferable to use a lock. Among the reasons is + * that this serializes interruptIdleWorkers, which avoids + * unnecessary interrupt storms, especially during shutdown. + * Otherwise exiting threads would concurrently interrupt those + * that have not yet interrupted. It also simplifies some of the + * associated statistics bookkeeping of largestPoolSize etc. We + * also hold mainLock on shutdown and shutdownNow, for the sake of + * ensuring workers set is stable while separately checking + * permission to interrupt and actually interrupting. + */ + private final ReentrantLock mainLock = new ReentrantLock(); + + /** + * Set containing all worker threads in pool. Accessed only when + * holding mainLock. + */ + private final HashSet workers = new HashSet(); + + /** + * Wait condition to support awaitTermination + */ + private final Condition termination = mainLock.newCondition(); + + /** + * Tracks largest attained pool size. Accessed only under + * mainLock. + */ + private int largestPoolSize; + + /** + * Counter for completed tasks. Updated only on termination of + * worker threads. Accessed only under mainLock. + */ + private long completedTaskCount; + + /* + * All user control parameters are declared as volatiles so that + * ongoing actions are based on freshest values, but without need + * for locking, since no internal invariants depend on them + * changing synchronously with respect to other actions. + */ + + /** + * Factory for new threads. All threads are created using this + * factory (via method addWorker). All callers must be prepared + * for addWorker to fail, which may reflect a system or user's + * policy limiting the number of threads. Even though it is not + * treated as an error, failure to create threads may result in + * new tasks being rejected or existing ones remaining stuck in + * the queue. + * + * We go further and preserve pool invariants even in the face of + * errors such as OutOfMemoryError, that might be thrown while + * trying to create threads. Such errors are rather common due to + * the need to allocate a native stack in Thread.start, and users + * will want to perform clean pool shutdown to clean up. There + * will likely be enough memory available for the cleanup code to + * complete without encountering yet another OutOfMemoryError. + */ + private volatile ThreadFactory threadFactory; + + /** + * Handler called when saturated or shutdown in execute. + */ + private volatile RejectedExecutionHandler handler; + + /** + * Timeout in nanoseconds for idle threads waiting for work. + * Threads use this timeout when there are more than corePoolSize + * present or if allowCoreThreadTimeOut. Otherwise they wait + * forever for new work. + */ + private volatile long keepAliveTime; + + /** + * If false (default), core threads stay alive even when idle. + * If true, core threads use keepAliveTime to time out waiting + * for work. + */ + private volatile boolean allowCoreThreadTimeOut; + + /** + * Core pool size is the minimum number of workers to keep alive + * (and not allow to time out etc) unless allowCoreThreadTimeOut + * is set, in which case the minimum is zero. + */ + private volatile int corePoolSize; + + /** + * Maximum pool size. Note that the actual maximum is internally + * bounded by CAPACITY. + */ + private volatile int maximumPoolSize; + + /** + * The default rejected execution handler + */ + private static final RejectedExecutionHandler defaultHandler = + new AbortPolicy(); + + /** + * Permission required for callers of shutdown and shutdownNow. + * We additionally require (see checkShutdownAccess) that callers + * have permission to actually interrupt threads in the worker set + * (as governed by Thread.interrupt, which relies on + * ThreadGroup.checkAccess, which in turn relies on + * SecurityManager.checkAccess). Shutdowns are attempted only if + * these checks pass. + * + * All actual invocations of Thread.interrupt (see + * interruptIdleWorkers and interruptWorkers) ignore + * SecurityExceptions, meaning that the attempted interrupts + * silently fail. In the case of shutdown, they should not fail + * unless the SecurityManager has inconsistent policies, sometimes + * allowing access to a thread and sometimes not. In such cases, + * failure to actually interrupt threads may disable or delay full + * termination. Other uses of interruptIdleWorkers are advisory, + * and failure to actually interrupt will merely delay response to + * configuration changes so is not handled exceptionally. + */ + private static final RuntimePermission shutdownPerm = + new RuntimePermission("modifyThread"); + + /** + * Class Worker mainly maintains interrupt control state for + * threads running tasks, along with other minor bookkeeping. + * This class opportunistically extends AbstractQueuedSynchronizer + * to simplify acquiring and releasing a lock surrounding each + * task execution. This protects against interrupts that are + * intended to wake up a worker thread waiting for a task from + * instead interrupting a task being run. We implement a simple + * non-reentrant mutual exclusion lock rather than use + * ReentrantLock because we do not want worker tasks to be able to + * reacquire the lock when they invoke pool control methods like + * setCorePoolSize. Additionally, to suppress interrupts until + * the thread actually starts running tasks, we initialize lock + * state to a negative value, and clear it upon start (in + * runWorker). + */ + private final class Worker + extends AbstractQueuedSynchronizer + implements Runnable + { + /** + * This class will never be serialized, but we provide a + * serialVersionUID to suppress a javac warning. + */ + private static final long serialVersionUID = 6138294804551838833L; + + /** Thread this worker is running in. Null if factory fails. */ + final Thread thread; + /** Initial task to run. Possibly null. */ + Runnable firstTask; + /** Per-thread task counter */ + volatile long completedTasks; + + /** + * Creates with given first task and thread from ThreadFactory. + * @param firstTask the first task (null if none) + */ + Worker(Runnable firstTask) { + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + this.thread = getThreadFactory().newThread(this); + } + + /** Delegates main run loop to outer runWorker */ + public void run() { + runWorker(this); + } + + // Lock methods + // + // The value 0 represents the unlocked state. + // The value 1 represents the locked state. + + protected boolean isHeldExclusively() { + return getState() != 0; + } + + protected boolean tryAcquire(int unused) { + if (compareAndSetState(0, 1)) { + setExclusiveOwnerThread(Thread.currentThread()); + return true; + } + return false; + } + + protected boolean tryRelease(int unused) { + setExclusiveOwnerThread(null); + setState(0); + return true; + } + + public void lock() { acquire(1); } + public boolean tryLock() { return tryAcquire(1); } + public void unlock() { release(1); } + public boolean isLocked() { return isHeldExclusively(); } + + void interruptIfStarted() { + Thread t; + if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } + } + } + } + + /* + * Methods for setting control state + */ + + /** + * Transitions runState to given target, or leaves it alone if + * already at least the given target. + * + * @param targetState the desired state, either SHUTDOWN or STOP + * (but not TIDYING or TERMINATED -- use tryTerminate for that) + */ + private void advanceRunState(int targetState) { + for (;;) { + int c = ctl.get(); + if (runStateAtLeast(c, targetState) || + ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) + break; + } + } + + /** + * Transitions to TERMINATED state if either (SHUTDOWN and pool + * and queue empty) or (STOP and pool empty). If otherwise + * eligible to terminate but workerCount is nonzero, interrupts an + * idle worker to ensure that shutdown signals propagate. This + * method must be called following any action that might make + * termination possible -- reducing worker count or removing tasks + * from the queue during shutdown. The method is non-private to + * allow access from ScheduledThreadPoolExecutor. + */ + final void tryTerminate() { + for (;;) { + int c = ctl.get(); + if (isRunning(c) || + runStateAtLeast(c, TIDYING) || + (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) + return; + if (workerCountOf(c) != 0) { // Eligible to terminate + interruptIdleWorkers(ONLY_ONE); + return; + } + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { + try { + terminated(); + } finally { + ctl.set(ctlOf(TERMINATED, 0)); + termination.signalAll(); + } + return; + } + } finally { + mainLock.unlock(); + } + // else retry on failed CAS + } + } + + /* + * Methods for controlling interrupts to worker threads. + */ + + /** + * If there is a security manager, makes sure caller has + * permission to shut down threads in general (see shutdownPerm). + * If this passes, additionally makes sure the caller is allowed + * to interrupt each worker thread. This might not be true even if + * first check passed, if the SecurityManager treats some threads + * specially. + */ + private void checkShutdownAccess() { + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkPermission(shutdownPerm); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (Worker w : workers) + security.checkAccess(w.thread); + } finally { + mainLock.unlock(); + } + } + } + + /** + * Interrupts all threads, even if active. Ignores SecurityExceptions + * (in which case some threads may remain uninterrupted). + */ + private void interruptWorkers() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (Worker w : workers) + w.interruptIfStarted(); + } finally { + mainLock.unlock(); + } + } + + /** + * Interrupts threads that might be waiting for tasks (as + * indicated by not being locked) so they can check for + * termination or configuration changes. Ignores + * SecurityExceptions (in which case some threads may remain + * uninterrupted). + * + * @param onlyOne If true, interrupt at most one worker. This is + * called only from tryTerminate when termination is otherwise + * enabled but there are still other workers. In this case, at + * most one waiting worker is interrupted to propagate shutdown + * signals in case all threads are currently waiting. + * Interrupting any arbitrary thread ensures that newly arriving + * workers since shutdown began will also eventually exit. + * To guarantee eventual termination, it suffices to always + * interrupt only one idle worker, but shutdown() interrupts all + * idle workers so that redundant workers exit promptly, not + * waiting for a straggler task to finish. + */ + private void interruptIdleWorkers(boolean onlyOne) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (Worker w : workers) { + Thread t = w.thread; + if (!t.isInterrupted() && w.tryLock()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } finally { + w.unlock(); + } + } + if (onlyOne) + break; + } + } finally { + mainLock.unlock(); + } + } + + /** + * Common form of interruptIdleWorkers, to avoid having to + * remember what the boolean argument means. + */ + private void interruptIdleWorkers() { + interruptIdleWorkers(false); + } + + private static final boolean ONLY_ONE = true; + + /* + * Misc utilities, most of which are also exported to + * ScheduledThreadPoolExecutor + */ + + /** + * Invokes the rejected execution handler for the given command. + * Package-protected for use by ScheduledThreadPoolExecutor. + */ + final void reject(Runnable command) { + handler.rejectedExecution(command, this); + } + + /** + * Performs any further cleanup following run state transition on + * invocation of shutdown. A no-op here, but used by + * ScheduledThreadPoolExecutor to cancel delayed tasks. + */ + void onShutdown() { + } + + /** + * State check needed by ScheduledThreadPoolExecutor to + * enable running tasks during shutdown. + * + * @param shutdownOK true if should return true if SHUTDOWN + */ + final boolean isRunningOrShutdown(boolean shutdownOK) { + int rs = runStateOf(ctl.get()); + return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); + } + + /** + * Drains the task queue into a new list, normally using + * drainTo. But if the queue is a DelayQueue or any other kind of + * queue for which poll or drainTo may fail to remove some + * elements, it deletes them one by one. + */ + private List drainQueue() { + BlockingQueue q = workQueue; + ArrayList taskList = new ArrayList(); + q.drainTo(taskList); + if (!q.isEmpty()) { + for (Runnable r : q.toArray(new Runnable[0])) { + if (q.remove(r)) + taskList.add(r); + } + } + return taskList; + } + + /* + * Methods for creating, running and cleaning up after workers + */ + + /** + * Checks if a new worker can be added with respect to current + * pool state and the given bound (either core or maximum). If so, + * the worker count is adjusted accordingly, and, if possible, a + * new worker is created and started, running firstTask as its + * first task. This method returns false if the pool is stopped or + * eligible to shut down. It also returns false if the thread + * factory fails to create a thread when asked. If the thread + * creation fails, either due to the thread factory returning + * null, or due to an exception (typically OutOfMemoryError in + * Thread.start()), we roll back cleanly. + * + * @param firstTask the task the new thread should run first (or + * null if none). Workers are created with an initial first task + * (in method execute()) to bypass queuing when there are fewer + * than corePoolSize threads (in which case we always start one), + * or when the queue is full (in which case we must bypass queue). + * Initially idle threads are usually created via + * prestartCoreThread or to replace other dying workers. + * + * @param core if true use corePoolSize as bound, else + * maximumPoolSize. (A boolean indicator is used here rather than a + * value to ensure reads of fresh values after checking other pool + * state). + * @return true if successful + */ + private boolean addWorker(Runnable firstTask, boolean core) { + retry: + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN && + ! (rs == SHUTDOWN && + firstTask == null && + ! workQueue.isEmpty())) + return false; + + for (;;) { + int wc = workerCountOf(c); + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + if (compareAndIncrementWorkerCount(c)) + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + // Recheck while holding lock. + // Back out on ThreadFactory failure or if + // shut down before lock acquired. + int rs = runStateOf(ctl.get()); + + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + workers.add(w); + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + workerAdded = true; + } + } finally { + mainLock.unlock(); + } + if (workerAdded) { + t.start(); + workerStarted = true; + } + } + } finally { + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; + } + + /** + * Rolls back the worker thread creation. + * - removes worker from workers, if present + * - decrements worker count + * - rechecks for termination, in case the existence of this + * worker was holding up termination + */ + private void addWorkerFailed(Worker w) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + if (w != null) + workers.remove(w); + decrementWorkerCount(); + tryTerminate(); + } finally { + mainLock.unlock(); + } + } + + /** + * Performs cleanup and bookkeeping for a dying worker. Called + * only from worker threads. Unless completedAbruptly is set, + * assumes that workerCount has already been adjusted to account + * for exit. This method removes thread from worker set, and + * possibly terminates the pool or replaces the worker if either + * it exited due to user task exception or if fewer than + * corePoolSize workers are running or queue is non-empty but + * there are no workers. + * + * @param w the worker + * @param completedAbruptly if the worker died due to user exception + */ + private void processWorkerExit(Worker w, boolean completedAbruptly) { + if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted + decrementWorkerCount(); + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + completedTaskCount += w.completedTasks; + workers.remove(w); + } finally { + mainLock.unlock(); + } + + tryTerminate(); + + int c = ctl.get(); + if (runStateLessThan(c, STOP)) { + if (!completedAbruptly) { + int min = allowCoreThreadTimeOut ? 0 : corePoolSize; + if (min == 0 && ! workQueue.isEmpty()) + min = 1; + if (workerCountOf(c) >= min) + return; // replacement not needed + } + addWorker(null, false); + } + } + + /** + * Performs blocking or timed wait for a task, depending on + * current configuration settings, or returns null if this worker + * must exit because of any of: + * 1. There are more than maximumPoolSize workers (due to + * a call to setMaximumPoolSize). + * 2. The pool is stopped. + * 3. The pool is shutdown and the queue is empty. + * 4. This worker timed out waiting for a task, and timed-out + * workers are subject to termination (that is, + * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) + * both before and after the timed wait, and if the queue is + * non-empty, this worker is not the last thread in the pool. + * + * @return task, or null if the worker must exit, in which case + * workerCount is decremented + */ + private Runnable getTask() { + boolean timedOut = false; // Did the last poll() time out? + + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { + decrementWorkerCount(); + return null; + } + + int wc = workerCountOf(c); + + // Are workers subject to culling? + boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; + + if ((wc > maximumPoolSize || (timed && timedOut)) + && (wc > 1 || workQueue.isEmpty())) { + if (compareAndDecrementWorkerCount(c)) + return null; + continue; + } + + try { + Runnable r = timed ? + workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : + workQueue.take(); + if (r != null) + return r; + timedOut = true; + } catch (InterruptedException retry) { + timedOut = false; + } + } + } + + /** + * Main worker run loop. Repeatedly gets tasks from queue and + * executes them, while coping with a number of issues: + * + * 1. We may start out with an initial task, in which case we + * don't need to get the first one. Otherwise, as long as pool is + * running, we get tasks from getTask. If it returns null then the + * worker exits due to changed pool state or configuration + * parameters. Other exits result from exception throws in + * external code, in which case completedAbruptly holds, which + * usually leads processWorkerExit to replace this thread. + * + * 2. Before running any task, the lock is acquired to prevent + * other pool interrupts while the task is executing, and then we + * ensure that unless pool is stopping, this thread does not have + * its interrupt set. + * + * 3. Each task run is preceded by a call to beforeExecute, which + * might throw an exception, in which case we cause thread to die + * (breaking loop with completedAbruptly true) without processing + * the task. + * + * 4. Assuming beforeExecute completes normally, we run the task, + * gathering any of its thrown exceptions to send to afterExecute. + * We separately handle RuntimeException, Error (both of which the + * specs guarantee that we trap) and arbitrary Throwables. + * Because we cannot rethrow Throwables within Runnable.run, we + * wrap them within Errors on the way out (to the thread's + * UncaughtExceptionHandler). Any thrown exception also + * conservatively causes thread to die. + * + * 5. After task.run completes, we call afterExecute, which may + * also throw an exception, which will also cause thread to + * die. According to JLS Sec 14.20, this exception is the one that + * will be in effect even if task.run throws. + * + * The net effect of the exception mechanics is that afterExecute + * and the thread's UncaughtExceptionHandler have as accurate + * information as we can provide about any problems encountered by + * user code. + * + * @param w the worker + */ + final void runWorker(Worker w) { + Thread wt = Thread.currentThread(); + Runnable task = w.firstTask; + w.firstTask = null; + w.unlock(); // allow interrupts + boolean completedAbruptly = true; + try { + while (task != null || (task = getTask()) != null) { + w.lock(); + // If pool is stopping, ensure thread is interrupted; + // if not, ensure thread is not interrupted. This + // requires a recheck in second case to deal with + // shutdownNow race while clearing interrupt + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) + wt.interrupt(); + try { + beforeExecute(wt, task); + Throwable thrown = null; + try { + task.run(); + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + } finally { + afterExecute(task, thrown); + } + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + processWorkerExit(w, completedAbruptly); + } + } + + // Public constructors and methods + + /** + * Creates a new {@code ThreadPoolExecutor} with the given initial + * parameters and default thread factory and rejected execution handler. + * It may be more convenient to use one of the {@link Executors} factory + * methods instead of this general purpose constructor. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} is null + */ + public SuspendableThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); + } + + /** + * Creates a new {@code ThreadPoolExecutor} with the given initial + * parameters and default rejected execution handler. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor + * creates a new thread + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} is null + */ + public SuspendableThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, defaultHandler); + } + + /** + * Creates a new {@code ThreadPoolExecutor} with the given initial + * parameters and default thread factory. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code handler} is null + */ + public SuspendableThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), handler); + } + + /** + * Creates a new {@code ThreadPoolExecutor} with the given initial + * parameters. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor + * creates a new thread + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public SuspendableThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } + + /** + * Executes the given task sometime in the future. The task + * may execute in a new thread or in an existing pooled thread. + * + * If the task cannot be submitted for execution, either because this + * executor has been shutdown or because its capacity has been reached, + * the task is handled by the current {@code RejectedExecutionHandler}. + * + * @param command the task to execute + * @throws RejectedExecutionException at discretion of + * {@code RejectedExecutionHandler}, if the task + * cannot be accepted for execution + * @throws NullPointerException if {@code command} is null + */ + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + /* + * Proceed in 3 steps: + * + * 1. If fewer than corePoolSize threads are running, try to + * start a new thread with the given command as its first + * task. The call to addWorker atomically checks runState and + * workerCount, and so prevents false alarms that would add + * threads when it shouldn't, by returning false. + * + * 2. If a task can be successfully queued, then we still need + * to double-check whether we should have added a thread + * (because existing ones died since last checking) or that + * the pool shut down since entry into this method. So we + * recheck state and if necessary roll back the enqueuing if + * stopped, or start a new thread if there are none. + * + * 3. If we cannot queue task, then we try to add a new + * thread. If it fails, we know we are shut down or saturated + * and so reject the task. + */ + int c = ctl.get(); + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) + reject(command); + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + else if (!addWorker(command, false)) + reject(command); + } + + /** + * Initiates an orderly shutdown in which previously submitted + * tasks are executed, but no new tasks will be accepted. + * Invocation has no additional effect if already shut down. + * + *

This method does not wait for previously submitted tasks to + * complete execution. Use {@link #awaitTermination awaitTermination} + * to do that. + * + * @throws SecurityException {@inheritDoc} + */ + public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + advanceRunState(SHUTDOWN); + interruptIdleWorkers(); + onShutdown(); // hook for ScheduledThreadPoolExecutor + } finally { + mainLock.unlock(); + } + tryTerminate(); + } + + /** + * Attempts to stop all actively executing tasks, halts the + * processing of waiting tasks, and returns a list of the tasks + * that were awaiting execution. These tasks are drained (removed) + * from the task queue upon return from this method. + * + *

This method does not wait for actively executing tasks to + * terminate. Use {@link #awaitTermination awaitTermination} to + * do that. + * + *

There are no guarantees beyond best-effort attempts to stop + * processing actively executing tasks. This implementation + * cancels tasks via {@link Thread#interrupt}, so any task that + * fails to respond to interrupts may never terminate. + * + * @throws SecurityException {@inheritDoc} + */ + public List shutdownNow() { + List tasks; + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + advanceRunState(STOP); + interruptWorkers(); + tasks = drainQueue(); + } finally { + mainLock.unlock(); + } + tryTerminate(); + return tasks; + } + + public boolean isShutdown() { + return ! isRunning(ctl.get()); + } + + /** + * Returns true if this executor is in the process of terminating + * after {@link #shutdown} or {@link #shutdownNow} but has not + * completely terminated. This method may be useful for + * debugging. A return of {@code true} reported a sufficient + * period after shutdown may indicate that submitted tasks have + * ignored or suppressed interruption, causing this executor not + * to properly terminate. + * + * @return {@code true} if terminating but not yet terminated + */ + public boolean isTerminating() { + int c = ctl.get(); + return ! isRunning(c) && runStateLessThan(c, TERMINATED); + } + + public boolean isTerminated() { + return runStateAtLeast(ctl.get(), TERMINATED); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (;;) { + if (runStateAtLeast(ctl.get(), TERMINATED)) + return true; + if (nanos <= 0) + return false; + nanos = termination.awaitNanos(nanos); + } + } finally { + mainLock.unlock(); + } + } + + /** + * Invokes {@code shutdown} when this executor is no longer + * referenced and it has no threads. + */ + protected void finalize() { + shutdown(); + } + + /** + * Sets the thread factory used to create new threads. + * + * @param threadFactory the new thread factory + * @throws NullPointerException if threadFactory is null + * @see #getThreadFactory + */ + public void setThreadFactory(ThreadFactory threadFactory) { + if (threadFactory == null) + throw new NullPointerException(); + this.threadFactory = threadFactory; + } + + /** + * Returns the thread factory used to create new threads. + * + * @return the current thread factory + * @see #setThreadFactory(ThreadFactory) + */ + public ThreadFactory getThreadFactory() { + return threadFactory; + } + + /** + * Sets a new handler for unexecutable tasks. + * + * @param handler the new handler + * @throws NullPointerException if handler is null + * @see #getRejectedExecutionHandler + */ + public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { + if (handler == null) + throw new NullPointerException(); + this.handler = handler; + } + + /** + * Returns the current handler for unexecutable tasks. + * + * @return the current handler + * @see #setRejectedExecutionHandler(RejectedExecutionHandler) + */ + public RejectedExecutionHandler getRejectedExecutionHandler() { + return handler; + } + + /** + * Sets the core number of threads. This overrides any value set + * in the constructor. If the new value is smaller than the + * current value, excess existing threads will be terminated when + * they next become idle. If larger, new threads will, if needed, + * be started to execute any queued tasks. + * + * @param corePoolSize the new core size + * @throws IllegalArgumentException if {@code corePoolSize < 0} + * @see #getCorePoolSize + */ + public void setCorePoolSize(int corePoolSize) { + if (corePoolSize < 0) + throw new IllegalArgumentException(); + int delta = corePoolSize - this.corePoolSize; + this.corePoolSize = corePoolSize; + if (workerCountOf(ctl.get()) > corePoolSize) + interruptIdleWorkers(); + else if (delta > 0) { + // We don't really know how many new threads are "needed". + // As a heuristic, prestart enough new workers (up to new + // core size) to handle the current number of tasks in + // queue, but stop if queue becomes empty while doing so. + int k = Math.min(delta, workQueue.size()); + while (k-- > 0 && addWorker(null, true)) { + if (workQueue.isEmpty()) + break; + } + } + } + + /** + * Returns the core number of threads. + * + * @return the core number of threads + * @see #setCorePoolSize + */ + public int getCorePoolSize() { + return corePoolSize; + } + + /** + * Starts a core thread, causing it to idly wait for work. This + * overrides the default policy of starting core threads only when + * new tasks are executed. This method will return {@code false} + * if all core threads have already been started. + * + * @return {@code true} if a thread was started + */ + public boolean prestartCoreThread() { + return workerCountOf(ctl.get()) < corePoolSize && + addWorker(null, true); + } + + /** + * Same as prestartCoreThread except arranges that at least one + * thread is started even if corePoolSize is 0. + */ + void ensurePrestart() { + int wc = workerCountOf(ctl.get()); + if (wc < corePoolSize) + addWorker(null, true); + else if (wc == 0) + addWorker(null, false); + } + + /** + * Starts all core threads, causing them to idly wait for work. This + * overrides the default policy of starting core threads only when + * new tasks are executed. + * + * @return the number of threads started + */ + public int prestartAllCoreThreads() { + int n = 0; + while (addWorker(null, true)) + ++n; + return n; + } + + /** + * Returns true if this pool allows core threads to time out and + * terminate if no tasks arrive within the keepAlive time, being + * replaced if needed when new tasks arrive. When true, the same + * keep-alive policy applying to non-core threads applies also to + * core threads. When false (the default), core threads are never + * terminated due to lack of incoming tasks. + * + * @return {@code true} if core threads are allowed to time out, + * else {@code false} + * + * @since 1.6 + */ + public boolean allowsCoreThreadTimeOut() { + return allowCoreThreadTimeOut; + } + + /** + * Sets the policy governing whether core threads may time out and + * terminate if no tasks arrive within the keep-alive time, being + * replaced if needed when new tasks arrive. When false, core + * threads are never terminated due to lack of incoming + * tasks. When true, the same keep-alive policy applying to + * non-core threads applies also to core threads. To avoid + * continual thread replacement, the keep-alive time must be + * greater than zero when setting {@code true}. This method + * should in general be called before the pool is actively used. + * + * @param value {@code true} if should time out, else {@code false} + * @throws IllegalArgumentException if value is {@code true} + * and the current keep-alive time is not greater than zero + * + * @since 1.6 + */ + public void allowCoreThreadTimeOut(boolean value) { + if (value && keepAliveTime <= 0) + throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); + if (value != allowCoreThreadTimeOut) { + allowCoreThreadTimeOut = value; + if (value) + interruptIdleWorkers(); + } + } + + /** + * Sets the maximum allowed number of threads. This overrides any + * value set in the constructor. If the new value is smaller than + * the current value, excess existing threads will be + * terminated when they next become idle. + * + * @param maximumPoolSize the new maximum + * @throws IllegalArgumentException if the new maximum is + * less than or equal to zero, or + * less than the {@linkplain #getCorePoolSize core pool size} + * @see #getMaximumPoolSize + */ + public void setMaximumPoolSize(int maximumPoolSize) { + if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) + throw new IllegalArgumentException(); + this.maximumPoolSize = maximumPoolSize; + if (workerCountOf(ctl.get()) > maximumPoolSize) + interruptIdleWorkers(); + } + + /** + * Returns the maximum allowed number of threads. + * + * @return the maximum allowed number of threads + * @see #setMaximumPoolSize + */ + public int getMaximumPoolSize() { + return maximumPoolSize; + } + + /** + * Sets the time limit for which threads may remain idle before + * being terminated. If there are more than the core number of + * threads currently in the pool, after waiting this amount of + * time without processing a task, excess threads will be + * terminated. This overrides any value set in the constructor. + * + * @param time the time to wait. A time value of zero will cause + * excess threads to terminate immediately after executing tasks. + * @param unit the time unit of the {@code time} argument + * @throws IllegalArgumentException if {@code time} less than zero or + * if {@code time} is zero and {@code allowsCoreThreadTimeOut} + * @see #getKeepAliveTime(TimeUnit) + */ + public void setKeepAliveTime(long time, TimeUnit unit) { + if (time < 0) + throw new IllegalArgumentException(); + if (time == 0 && allowsCoreThreadTimeOut()) + throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); + long keepAliveTime = unit.toNanos(time); + long delta = keepAliveTime - this.keepAliveTime; + this.keepAliveTime = keepAliveTime; + if (delta < 0) + interruptIdleWorkers(); + } + + /** + * Returns the thread keep-alive time, which is the amount of time + * that threads in excess of the core pool size may remain + * idle before being terminated. + * + * @param unit the desired time unit of the result + * @return the time limit + * @see #setKeepAliveTime(long, TimeUnit) + */ + public long getKeepAliveTime(TimeUnit unit) { + return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); + } + + /* User-level queue utilities */ + + /** + * Returns the task queue used by this executor. Access to the + * task queue is intended primarily for debugging and monitoring. + * This queue may be in active use. Retrieving the task queue + * does not prevent queued tasks from executing. + * + * @return the task queue + */ + public BlockingQueue getQueue() { + return workQueue; + } + + /** + * Removes this task from the executor's internal queue if it is + * present, thus causing it not to be run if it has not already + * started. + * + *

This method may be useful as one part of a cancellation + * scheme. It may fail to remove tasks that have been converted + * into other forms before being placed on the internal queue. For + * example, a task entered using {@code submit} might be + * converted into a form that maintains {@code Future} status. + * However, in such cases, method {@link #purge} may be used to + * remove those Futures that have been cancelled. + * + * @param task the task to remove + * @return {@code true} if the task was removed + */ + public boolean remove(Runnable task) { + boolean removed = workQueue.remove(task); + tryTerminate(); // In case SHUTDOWN and now empty + return removed; + } + + /** + * Tries to remove from the work queue all {@link Future} + * tasks that have been cancelled. This method can be useful as a + * storage reclamation operation, that has no other impact on + * functionality. Cancelled tasks are never executed, but may + * accumulate in work queues until worker threads can actively + * remove them. Invoking this method instead tries to remove them now. + * However, this method may fail to remove tasks in + * the presence of interference by other threads. + */ + public void purge() { + final BlockingQueue q = workQueue; + try { + Iterator it = q.iterator(); + while (it.hasNext()) { + Runnable r = it.next(); + if (r instanceof Future && ((Future)r).isCancelled()) + it.remove(); + } + } catch (ConcurrentModificationException fallThrough) { + // Take slow path if we encounter interference during traversal. + // Make copy for traversal and call remove for cancelled entries. + // The slow path is more likely to be O(N*N). + for (Object r : q.toArray()) + if (r instanceof Future && ((Future)r).isCancelled()) + q.remove(r); + } + + tryTerminate(); // In case SHUTDOWN and now empty + } + + /* Statistics */ + + /** + * Returns the current number of threads in the pool. + * + * @return the number of threads + */ + public int getPoolSize() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + // Remove rare and surprising possibility of + // isTerminated() && getPoolSize() > 0 + return runStateAtLeast(ctl.get(), TIDYING) ? 0 + : workers.size(); + } finally { + mainLock.unlock(); + } + } + + /** + * Returns the approximate number of threads that are actively + * executing tasks. + * + * @return the number of threads + */ + public int getActiveCount() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + int n = 0; + for (Worker w : workers) + if (w.isLocked()) + ++n; + return n; + } finally { + mainLock.unlock(); + } + } + + /** + * Returns the largest number of threads that have ever + * simultaneously been in the pool. + * + * @return the number of threads + */ + public int getLargestPoolSize() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + return largestPoolSize; + } finally { + mainLock.unlock(); + } + } + + /** + * Returns the approximate total number of tasks that have ever been + * scheduled for execution. Because the states of tasks and + * threads may change dynamically during computation, the returned + * value is only an approximation. + * + * @return the number of tasks + */ + public long getTaskCount() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + long n = completedTaskCount; + for (Worker w : workers) { + n += w.completedTasks; + if (w.isLocked()) + ++n; + } + return n + workQueue.size(); + } finally { + mainLock.unlock(); + } + } + + /** + * Returns the approximate total number of tasks that have + * completed execution. Because the states of tasks and threads + * may change dynamically during computation, the returned value + * is only an approximation, but one that does not ever decrease + * across successive calls. + * + * @return the number of tasks + */ + public long getCompletedTaskCount() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + long n = completedTaskCount; + for (Worker w : workers) + n += w.completedTasks; + return n; + } finally { + mainLock.unlock(); + } + } + + /** + * Returns a string identifying this pool, as well as its state, + * including indications of run state and estimated worker and + * task counts. + * + * @return a string identifying this pool, as well as its state + */ + public String toString() { + long ncompleted; + int nworkers, nactive; + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + ncompleted = completedTaskCount; + nactive = 0; + nworkers = workers.size(); + for (Worker w : workers) { + ncompleted += w.completedTasks; + if (w.isLocked()) + ++nactive; + } + } finally { + mainLock.unlock(); + } + int c = ctl.get(); + String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : + (runStateAtLeast(c, TERMINATED) ? "Terminated" : + "Shutting down")); + return super.toString() + + "[" + rs + + ", pool size = " + nworkers + + ", active threads = " + nactive + + ", queued tasks = " + workQueue.size() + + ", completed tasks = " + ncompleted + + "]"; + } + + /* Extension hooks */ + + /** + * Method invoked prior to executing the given Runnable in the + * given thread. This method is invoked by thread {@code t} that + * will execute task {@code r}, and may be used to re-initialize + * ThreadLocals, or to perform logging. + * + *

This implementation does nothing, but may be customized in + * subclasses. Note: To properly nest multiple overridings, subclasses + * should generally invoke {@code super.beforeExecute} at the end of + * this method. + * + * @param t the thread that will run task {@code r} + * @param r the task that will be executed + */ + protected void beforeExecute(Thread t, Runnable r) { } + + /** + * Method invoked upon completion of execution of the given Runnable. + * This method is invoked by the thread that executed the task. If + * non-null, the Throwable is the uncaught {@code RuntimeException} + * or {@code Error} that caused execution to terminate abruptly. + * + *

This implementation does nothing, but may be customized in + * subclasses. Note: To properly nest multiple overridings, subclasses + * should generally invoke {@code super.afterExecute} at the + * beginning of this method. + * + *

Note: When actions are enclosed in tasks (such as + * {@link FutureTask}) either explicitly or via methods such as + * {@code submit}, these task objects catch and maintain + * computational exceptions, and so they do not cause abrupt + * termination, and the internal exceptions are not + * passed to this method. If you would like to trap both kinds of + * failures in this method, you can further probe for such cases, + * as in this sample subclass that prints either the direct cause + * or the underlying exception if a task has been aborted: + * + *

 {@code
+     * class ExtendedExecutor extends ThreadPoolExecutor {
+     *   // ...
+     *   protected void afterExecute(Runnable r, Throwable t) {
+     *     super.afterExecute(r, t);
+     *     if (t == null && r instanceof Future) {
+     *       try {
+     *         Object result = ((Future) r).get();
+     *       } catch (CancellationException ce) {
+     *           t = ce;
+     *       } catch (ExecutionException ee) {
+     *           t = ee.getCause();
+     *       } catch (InterruptedException ie) {
+     *           Thread.currentThread().interrupt(); // ignore/reset
+     *       }
+     *     }
+     *     if (t != null)
+     *       System.out.println(t);
+     *   }
+     * }}
+ * + * @param r the runnable that has completed + * @param t the exception that caused termination, or null if + * execution completed normally + */ + protected void afterExecute(Runnable r, Throwable t) { } + + /** + * Method invoked when the Executor has terminated. Default + * implementation does nothing. Note: To properly nest multiple + * overridings, subclasses should generally invoke + * {@code super.terminated} within this method. + */ + protected void terminated() { } + + /* Predefined RejectedExecutionHandlers */ + + /** + * A handler for rejected tasks that runs the rejected task + * directly in the calling thread of the {@code execute} method, + * unless the executor has been shut down, in which case the task + * is discarded. + */ + public static class CallerRunsPolicy implements RejectedExecutionHandler { + /** + * Creates a {@code CallerRunsPolicy}. + */ + public CallerRunsPolicy() { } + + /** + * Executes task r in the caller's thread, unless the executor + * has been shut down, in which case the task is discarded. + * + * @param r the runnable task requested to be executed + * @param e the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + if (!e.isShutdown()) { + r.run(); + } + } + } + + /** + * A handler for rejected tasks that throws a + * {@code RejectedExecutionException}. + */ + public static class AbortPolicy implements RejectedExecutionHandler { + /** + * Creates an {@code AbortPolicy}. + */ + public AbortPolicy() { } + + /** + * Always throws RejectedExecutionException. + * + * @param r the runnable task requested to be executed + * @param e the executor attempting to execute this task + * @throws RejectedExecutionException always + */ + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + throw new RejectedExecutionException("Task " + r.toString() + + " rejected from " + + e.toString()); + } + } + + /** + * A handler for rejected tasks that silently discards the + * rejected task. + */ + public static class DiscardPolicy implements RejectedExecutionHandler { + /** + * Creates a {@code DiscardPolicy}. + */ + public DiscardPolicy() { } + + /** + * Does nothing, which has the effect of discarding task r. + * + * @param r the runnable task requested to be executed + * @param e the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + } + } + + /** + * A handler for rejected tasks that discards the oldest unhandled + * request and then retries {@code execute}, unless the executor + * is shut down, in which case the task is discarded. + */ + public static class DiscardOldestPolicy implements RejectedExecutionHandler { + /** + * Creates a {@code DiscardOldestPolicy} for the given executor. + */ + public DiscardOldestPolicy() { } + + /** + * Obtains and ignores the next task that the executor + * would otherwise execute, if one is immediately available, + * and then retries execution of task r, unless the executor + * is shut down, in which case task r is instead discarded. + * + * @param r the runnable task requested to be executed + * @param e the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + if (!e.isShutdown()) { + e.getQueue().poll(); + e.execute(r); + } + } + } +} diff --git a/sources/src/main/java/io/akarin/server/core/AkarinGlobalConfig.java b/sources/src/main/java/io/akarin/server/core/AkarinGlobalConfig.java index d3ed79e1d..2d64c651f 100644 --- a/sources/src/main/java/io/akarin/server/core/AkarinGlobalConfig.java +++ b/sources/src/main/java/io/akarin/server/core/AkarinGlobalConfig.java @@ -160,11 +160,6 @@ public class AkarinGlobalConfig { playersPerIOThread = getInt("core.players-per-chunk-io-thread", 50); } - public static boolean silentAsyncTimings; - private static void silentAsyncTimings() { - silentAsyncTimings = getBoolean("core.always-silent-async-timing", false); - } - public static long timeUpdateInterval; private static void timeUpdateInterval() { timeUpdateInterval = getSeconds(getString("core.tick-rate.world-time-update-interval", "1s")) * 10; @@ -257,4 +252,9 @@ public class AkarinGlobalConfig { private static void fileIOThreads() { fileIOThreads = getInt("core.chunk-save-threads", 2); } + + public static int parallelMode; + private static void parallelMode() { + parallelMode = getInt("core.parallel-mode", 1); + } } diff --git a/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java b/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java index cc7cc916a..61f89d2b6 100644 --- a/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java +++ b/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java @@ -60,11 +60,9 @@ public abstract class MixinMinecraftServer { shift = At.Shift.BEFORE )) private void prerun(CallbackInfo info) { - Akari.STAGE_ENTITY_TICK = new ExecutorCompletionService<>(Executors.newFixedThreadPool((cachedWorldSize = worlds.size()), new AssignableFactory("Akarin Parallel Entity Ticking Thread - $"))); - Akari.STAGE_WORLD_TICK = new ExecutorCompletionService<>(Executors.newFixedThreadPool(cachedWorldSize - 1, new AssignableFactory("Akarin Parallel World Ticking Thread - $"))); - primaryThread.setPriority(AkarinGlobalConfig.primaryThreadPriority < Thread.NORM_PRIORITY ? Thread.NORM_PRIORITY : (AkarinGlobalConfig.primaryThreadPriority > Thread.MAX_PRIORITY ? 10 : AkarinGlobalConfig.primaryThreadPriority)); + Akari.resizeTickExecutors((cachedWorldSize = worlds.size())); for (int i = 0; i < worlds.size(); ++i) { WorldServer world = worlds.get(i); @@ -157,7 +155,11 @@ public abstract class MixinMinecraftServer { private boolean tickEntities(WorldServer world) { try { + world.timings.tickEntities.startTiming(); world.tickEntities(); + world.timings.tickEntities.stopTiming(); + world.getTracker().updatePlayers(); + world.explosionDensityCache.clear(); // Paper - Optimize explosions } catch (Throwable throwable) { CrashReport crashreport; try { @@ -173,7 +175,9 @@ public abstract class MixinMinecraftServer { private void tickWorld(WorldServer world) { try { + world.timings.doTick.startTiming(); world.doTick(); + world.timings.doTick.stopTiming(); } catch (Throwable throwable) { CrashReport crashreport; try { @@ -213,71 +217,88 @@ public abstract class MixinMinecraftServer { ChunkIOExecutor.tick(); MinecraftTimings.chunkIOTickTimer.stopTiming(); - Akari.worldTiming.startTiming(); - // Resize - if (cachedWorldSize != worlds.size()) { - Akari.STAGE_ENTITY_TICK = new ExecutorCompletionService<>(Executors.newFixedThreadPool(cachedWorldSize = worlds.size(), new AssignableFactory("Akarin Parallel Entity Ticking Thread - $"))); - Akari.STAGE_WORLD_TICK = new ExecutorCompletionService<>(Executors.newFixedThreadPool(cachedWorldSize - 1, new AssignableFactory("Akarin Parallel World Ticking Thread - $"))); - } - tickedPrimaryEntities = false; - // Never tick one world concurrently! - int worldSize = worlds.size(); - for (int i = 0; i < worldSize; i++) { - // Impl Note: - // Entities ticking: index 2 -> ... -> 0 -> 1 (parallel) - // World ticking: index 1 -> ... (parallel) | 0 (main thread) - int interlaceEntity = i + 2; - WorldServer entityWorld = null; - if (interlaceEntity < worldSize) { - entityWorld = worlds.get(interlaceEntity); - } else { - if (tickedPrimaryEntities) { - entityWorld = worlds.get(1); - } else { - entityWorld = worlds.get(0); - tickedPrimaryEntities = true; - } - } - entityWorld.timings.tickEntities.startTiming(); - WorldServer fEntityWorld = entityWorld; - Akari.STAGE_ENTITY_TICK.submit(() -> { - synchronized (((IMixinWorldServer) fEntityWorld).lock()) { - tickEntities(fEntityWorld); - fEntityWorld.getTracker().updatePlayers(); - fEntityWorld.explosionDensityCache.clear(); // Paper - Optimize explosions - } - }, entityWorld); - - int interlaceWorld = i + 1; - if (interlaceWorld < worldSize) { - WorldServer world = worlds.get(interlaceWorld); - world.timings.doTick.startTiming(); - Akari.STAGE_WORLD_TICK.submit(() -> { - synchronized (((IMixinWorldServer) world).lock()) { - tickWorld(world); + if (cachedWorldSize != worlds.size()) Akari.resizeTickExecutors((cachedWorldSize = worlds.size())); + switch (AkarinGlobalConfig.parallelMode) { + case 1: + case 2: + default: + // Never tick one world concurrently! + for (int i = 0; i < cachedWorldSize; i++) { + // Impl Note: + // Entities ticking: index 1 -> ... -> 0 (parallel) + // World ticking: index 0 -> ... (parallel) + int interlace = i + 1; + WorldServer entityWorld = worlds.get(interlace < cachedWorldSize ? interlace : 0); + Akari.STAGE_TICK.submit(() -> { + synchronized (((IMixinWorldServer) entityWorld).lock()) { + tickEntities(entityWorld); + } + }, null/*new TimingSignal(entityWorld, true)*/); + + if (AkarinGlobalConfig.parallelMode != 1) { + int fi = i; + Akari.STAGE_TICK.submit(() -> { + WorldServer world = worlds.get(fi); + synchronized (((IMixinWorldServer) world).lock()) { + tickWorld(world); + } + }, null); } - }, world); - } + } + + if (AkarinGlobalConfig.parallelMode == 1) + Akari.STAGE_TICK.submit(() -> { + for (int i = 0; i < cachedWorldSize; i++) { + WorldServer world = worlds.get(i); + synchronized (((IMixinWorldServer) world).lock()) { + tickWorld(world); + } + } + }, null); + + for (int i = (AkarinGlobalConfig.parallelMode == 1 ? cachedWorldSize + 1 : cachedWorldSize * 2); i --> 0 ;) { + Akari.STAGE_TICK.take(); + } + + /* for (int i = (AkarinGlobalConfig.parallelMode == 1 ? cachedWorldSize : cachedWorldSize * 2); i --> 0 ;) { + long startTiming = System.nanoTime(); + TimingSignal signal = Akari.STAGE_TICK.take().get(); + IMixinTimingHandler timing = (IMixinTimingHandler) (signal.isEntities ? signal.tickedWorld.timings.tickEntities : signal.tickedWorld.timings.doTick); + timing.stopTiming(startTiming); // The overlap will be ignored + } */ + + break; + case 0: + Akari.STAGE_TICK.submit(() -> { + for (int i = 1; i <= cachedWorldSize; ++i) { + WorldServer world = worlds.get(i < cachedWorldSize ? i : 0); + synchronized (((IMixinWorldServer) world).lock()) { + tickEntities(world); + } + } + }, null); + + Akari.STAGE_TICK.submit(() -> { + for (int i = 0; i < cachedWorldSize; ++i) { + WorldServer world = worlds.get(i); + synchronized (((IMixinWorldServer) world).lock()) { + tickWorld(world); + } + } + }, null); + + Akari.STAGE_TICK.take(); + Akari.STAGE_TICK.take(); + break; + case -1: + for (int i = 0; i < cachedWorldSize; ++i) { + WorldServer world = worlds.get(i); + tickWorld(world); + tickEntities(world); + } + break; } - WorldServer primaryWorld = worlds.get(0); - primaryWorld.timings.doTick.startTiming(); - synchronized (((IMixinWorldServer) primaryWorld).lock()) { - tickWorld(primaryWorld); - } - primaryWorld.timings.doTick.stopTiming(); - - for (int i = 0; i < worldSize; i++) { - WorldServer world = worlds.get(i); - long startTiming = System.nanoTime(); - if (i != 0) { - ((TimingHandler) Akari.STAGE_WORLD_TICK.take().get().timings.doTick).stopTiming(startTiming); - startTiming = System.nanoTime(); - } - ((TimingHandler) Akari.STAGE_ENTITY_TICK.take().get().timings.tickEntities).stopTiming(startTiming); - } - Akari.worldTiming.stopTiming(); - Akari.callbackTiming.startTiming(); while ((runnable = Akari.callbackQueue.poll()) != null) runnable.run(); Akari.callbackTiming.stopTiming(); diff --git a/sources/src/main/java/io/akarin/server/mixin/core/MixinTimingHandler.java b/sources/src/main/java/io/akarin/server/mixin/core/MixinTimingHandler.java index 1f89e8547..54287e03a 100644 --- a/sources/src/main/java/io/akarin/server/mixin/core/MixinTimingHandler.java +++ b/sources/src/main/java/io/akarin/server/mixin/core/MixinTimingHandler.java @@ -1,52 +1,35 @@ package io.akarin.server.mixin.core; -import java.util.logging.Level; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; -import org.bukkit.Bukkit; import org.spongepowered.asm.mixin.Final; import org.spongepowered.asm.mixin.Mixin; import org.spongepowered.asm.mixin.Overwrite; import org.spongepowered.asm.mixin.Shadow; -import org.spongepowered.asm.mixin.injection.At; -import org.spongepowered.asm.mixin.injection.Inject; -import org.spongepowered.asm.mixin.injection.callback.CallbackInfoReturnable; - import co.aikar.timings.Timing; -import co.aikar.timings.TimingHandler; -import io.akarin.api.internal.Akari; -import io.akarin.api.internal.Akari.AssignableThread; -import io.akarin.server.core.AkarinGlobalConfig; -import net.minecraft.server.MinecraftServer; -@Mixin(value = TimingHandler.class, remap = false) +@Mixin(targets = "co.aikar.timings.TimingHandler", remap = false) public abstract class MixinTimingHandler { @Shadow @Final String name; @Shadow private boolean enabled; - @Shadow private volatile long start; - @Shadow private volatile int timingDepth; + @Shadow private AtomicLong start; + @Shadow private AtomicInteger timingDepth; @Shadow abstract void addDiff(long diff); @Shadow public abstract Timing startTiming(); @Overwrite public Timing startTimingIfSync() { - if (Akari.isPrimaryThread(false)) { - startTiming(); - } + startTiming(); return (Timing) this; } - @SuppressWarnings({"rawtypes", "unchecked"}) - @Inject(method = "startTiming", at = @At("HEAD"), cancellable = true) - public void onStartTiming(CallbackInfoReturnable cir) { - if (!Akari.isPrimaryThread(false)) cir.setReturnValue(this); // Avoid modify any field - } - @Overwrite public void stopTimingIfSync() { - if (Akari.isPrimaryThread(false)) { + //if (Akari.isPrimaryThread(false)) { stopTiming(true); // Avoid twice thread check - } + //} } @Overwrite @@ -54,20 +37,26 @@ public abstract class MixinTimingHandler { stopTiming(false); } + public void stopTiming(long start) { + if (enabled) addDiff(System.nanoTime() - start); + } + public void stopTiming(boolean alreadySync) { - Thread curThread = Thread.currentThread(); - if (!enabled || curThread instanceof AssignableThread) return; - if (!alreadySync && curThread != MinecraftServer.getServer().primaryThread) { - if (AkarinGlobalConfig.silentAsyncTimings) return; - - Bukkit.getLogger().log(Level.SEVERE, "stopTiming called async for " + name); - Thread.dumpStack(); - } + if (!enabled || timingDepth.decrementAndGet() != 0 || start.get() == 0) return; + /*if (!alreadySync) { + Thread curThread = Thread.currentThread(); + if (curThread != MinecraftServer.getServer().primaryThread) { + if (false && !AkarinGlobalConfig.silentAsyncTimings) { + Bukkit.getLogger().log(Level.SEVERE, "stopTiming called async for " + name); + Thread.dumpStack(); + } + start = 0; + return; + } + }*/ - // Main thread ensured - if (--timingDepth == 0 && start != 0) { - addDiff(System.nanoTime() - start); - start = 0; - } + // Safety ensured + long prev = start.getAndSet(0); // Akarin + addDiff(System.nanoTime() - prev); // Akarin } }