9
0
mirror of https://github.com/WiIIiam278/HuskSync.git synced 2025-12-19 14:59:21 +00:00

feat: improve data syncing with checkin petitions

This improves data fetching speed in cases where a user logs out during sync application; when they log back in, the server will petition the server they are checked out on to check them out.

We also now unlock users after saving sync on a server to accommodate this, and track user disconnection status to avoid inconsistencies with what platforms return for `isOnline`
This commit is contained in:
William278
2025-03-23 16:15:00 +00:00
parent ef7b3c4f32
commit 937ea9bc8e
17 changed files with 223 additions and 85 deletions

View File

@@ -99,6 +99,7 @@ public class BukkitHuskSync extends JavaPlugin implements HuskSync, BukkitTask.S
private final Map<Integer, MapView> mapViews = Maps.newConcurrentMap(); private final Map<Integer, MapView> mapViews = Maps.newConcurrentMap();
private final List<Migrator> availableMigrators = Lists.newArrayList(); private final List<Migrator> availableMigrators = Lists.newArrayList();
private final Set<UUID> lockedPlayers = Sets.newConcurrentHashSet(); private final Set<UUID> lockedPlayers = Sets.newConcurrentHashSet();
private final Set<UUID> disconnectingPlayers = Sets.newConcurrentHashSet();
private boolean disabling; private boolean disabling;
private Gson gson; private Gson gson;

View File

@@ -31,7 +31,11 @@ public interface BukkitUserDataHolder extends UserDataHolder {
@Override @Override
default Optional<? extends Data> getData(@NotNull Identifier id) { default Optional<? extends Data> getData(@NotNull Identifier id) {
if (!id.isCustom()) { if (id.isCustom()) {
return Optional.ofNullable(getCustomDataStore().get(id));
}
try {
return switch (id.getKeyValue()) { return switch (id.getKeyValue()) {
case "inventory" -> getInventory(); case "inventory" -> getInventory();
case "ender_chest" -> getEnderChest(); case "ender_chest" -> getEnderChest();
@@ -48,8 +52,10 @@ public interface BukkitUserDataHolder extends UserDataHolder {
case "persistent_data" -> getPersistentData(); case "persistent_data" -> getPersistentData();
default -> throw new IllegalStateException(String.format("Unexpected data type: %s", id)); default -> throw new IllegalStateException(String.format("Unexpected data type: %s", id));
}; };
} catch (Throwable e) {
getPlugin().debug("Failed to get data for key: " + id.asMinimalString(), e);
return Optional.empty();
} }
return Optional.ofNullable(getCustomDataStore().get(id));
} }
@Override @Override

View File

@@ -44,6 +44,7 @@ public class PaperEventListener extends BukkitEventListener {
} }
@Override @Override
@SuppressWarnings("RedundantMethodOverride")
public void onEnable() { public void onEnable() {
getPlugin().getServer().getPluginManager().registerEvents(this, getPlugin()); getPlugin().getServer().getPluginManager().registerEvents(this, getPlugin());
lockedHandler.onEnable(); lockedHandler.onEnable();

View File

@@ -57,8 +57,9 @@ public class BukkitUser extends OnlineUser implements BukkitUserDataHolder {
} }
@Override @Override
public boolean isOffline() { public boolean hasDisconnected() {
return player == null || !player.isOnline(); return getPlugin().getDisconnectingPlayers().contains(getUuid())
|| player == null || !player.isOnline();
} }
@Override @Override

View File

@@ -20,6 +20,7 @@
package net.william278.husksync; package net.william278.husksync;
import com.fatboyindustrial.gsonjavatime.Converters; import com.fatboyindustrial.gsonjavatime.Converters;
import com.google.common.collect.Maps;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import net.kyori.adventure.audience.Audience; import net.kyori.adventure.audience.Audience;
@@ -140,7 +141,7 @@ public interface HuskSync extends Task.Supplier, EventDispatcher, ConfigProvider
if (getPlayerCustomDataStore().containsKey(user.getUuid())) { if (getPlayerCustomDataStore().containsKey(user.getUuid())) {
return getPlayerCustomDataStore().get(user.getUuid()); return getPlayerCustomDataStore().get(user.getUuid());
} }
final Map<Identifier, Data> data = new HashMap<>(); final Map<Identifier, Data> data = Maps.newHashMap();
getPlayerCustomDataStore().put(user.getUuid(), data); getPlayerCustomDataStore().put(user.getUuid(), data);
return data; return data;
} }
@@ -315,6 +316,12 @@ public interface HuskSync extends Task.Supplier, EventDispatcher, ConfigProvider
@NotNull @NotNull
Set<UUID> getLockedPlayers(); Set<UUID> getLockedPlayers();
/**
* Get the set of UUIDs of players who are currently marked as disconnecting or disconnected
*/
@NotNull
Set<UUID> getDisconnectingPlayers();
default boolean isLocked(@NotNull UUID uuid) { default boolean isLocked(@NotNull UUID uuid) {
return getLockedPlayers().contains(uuid); return getLockedPlayers().contains(uuid);
} }

View File

@@ -22,6 +22,7 @@ package net.william278.husksync.data;
import lombok.*; import lombok.*;
import net.kyori.adventure.key.InvalidKeyException; import net.kyori.adventure.key.InvalidKeyException;
import net.kyori.adventure.key.Key; import net.kyori.adventure.key.Key;
import net.kyori.adventure.key.KeyPattern;
import org.intellij.lang.annotations.Subst; import org.intellij.lang.annotations.Subst;
import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@@ -39,6 +40,9 @@ import java.util.stream.Stream;
@Getter @Getter
public class Identifier { public class Identifier {
// Namespace for built-in identifiers
private static final @KeyPattern String DEFAULT_NAMESPACE = "husksync";
// Built-in identifiers // Built-in identifiers
public static final Identifier PERSISTENT_DATA = huskSync("persistent_data", true); public static final Identifier PERSISTENT_DATA = huskSync("persistent_data", true);
public static final Identifier INVENTORY = huskSync("inventory", true); public static final Identifier INVENTORY = huskSync("inventory", true);
@@ -93,8 +97,8 @@ public class Identifier {
*/ */
@NotNull @NotNull
public static Identifier from(@NotNull Key key, @NotNull Set<Dependency> dependencies) { public static Identifier from(@NotNull Key key, @NotNull Set<Dependency> dependencies) {
if (key.namespace().equals("husksync")) { if (key.namespace().equals(DEFAULT_NAMESPACE)) {
throw new IllegalArgumentException("You cannot register a key with \"husksync\" as the namespace!"); throw new IllegalArgumentException("Cannot register with %s as key namespace!".formatted(key.namespace()));
} }
return new Identifier(key, true, dependencies); return new Identifier(key, true, dependencies);
} }
@@ -143,7 +147,7 @@ public class Identifier {
@NotNull @NotNull
private static Identifier huskSync(@Subst("null") @NotNull String name, private static Identifier huskSync(@Subst("null") @NotNull String name,
boolean configDefault) throws InvalidKeyException { boolean configDefault) throws InvalidKeyException {
return new Identifier(Key.key("husksync", name), configDefault, Collections.emptySet()); return new Identifier(Key.key(DEFAULT_NAMESPACE, name), configDefault, Collections.emptySet());
} }
// Return an identifier with a HuskSync namespace // Return an identifier with a HuskSync namespace
@@ -151,7 +155,7 @@ public class Identifier {
private static Identifier huskSync(@Subst("null") @NotNull String name, private static Identifier huskSync(@Subst("null") @NotNull String name,
@SuppressWarnings("SameParameterValue") boolean configDefault, @SuppressWarnings("SameParameterValue") boolean configDefault,
@NotNull Dependency... dependents) throws InvalidKeyException { @NotNull Dependency... dependents) throws InvalidKeyException {
return new Identifier(Key.key("husksync", name), configDefault, Set.of(dependents)); return new Identifier(Key.key(DEFAULT_NAMESPACE, name), configDefault, Set.of(dependents));
} }
/** /**
@@ -209,13 +213,30 @@ public class Identifier {
* @return {@code false} if {@link #getKeyNamespace()} returns "husksync"; {@code true} otherwise * @return {@code false} if {@link #getKeyNamespace()} returns "husksync"; {@code true} otherwise
*/ */
public boolean isCustom() { public boolean isCustom() {
return !getKeyNamespace().equals("husksync"); return !getKeyNamespace().equals(DEFAULT_NAMESPACE);
}
/**
* Get the minimal string representation of this key.
* <p>
* If the namespace of the key is {@link #DEFAULT_NAMESPACE}, only the key value will be returned.
*
* @return the minimal string key representation
* @since 3.8
*/
@NotNull
public String asMinimalString() {
if (getKey().namespace().equals(DEFAULT_NAMESPACE)) {
return getKey().value();
}
return getKey().asString();
} }
/** /**
* Returns the identifier as a string (the key) * Returns the identifier as a string (the key)
* *
* @return the identifier as a string * @return the identifier as a string
* @since 3.0
*/ */
@NotNull @NotNull
@Override @Override
@@ -224,19 +245,29 @@ public class Identifier {
} }
/** /**
* Returns {@code true} if the given object is an identifier with the same key as this identifier * Return whether this Identifier is equal to another Identifier
* *
* @param obj the object to compare * @param obj another object
* @return {@code true} if the given object is an identifier with the same key as this identifier * @return {@code true} if this identifier matches the identifier of {@code obj}
* @since 3.8
*/ */
@Override @Override
public boolean equals(@Nullable Object obj) { public boolean equals(@Nullable Object obj) {
return obj instanceof Identifier other ? toString().equals(other.toString()) : super.equals(obj); if (obj instanceof Identifier other) {
return asMinimalString().equals(other.asMinimalString());
}
return false;
} }
/**
* Get the hash code of the Identifier (equivalent to {@link #asMinimalString()}->{@code #hashCode()}
*
* @return the hash code
* @since 3.8
*/
@Override @Override
public int hashCode() { public int hashCode() {
return key.toString().hashCode(); return asMinimalString().hashCode();
} }
// Get the config entry for the identifier // Get the config entry for the identifier

View File

@@ -75,6 +75,15 @@ public interface UserDataHolder extends DataHolder {
return DataSnapshot.builder(getPlugin()).data(this.getData()).saveCause(saveCause).buildAndPack(); return DataSnapshot.builder(getPlugin()).data(this.getData()).saveCause(saveCause).buildAndPack();
} }
/**
* Returns whether data can be applied to the holder at this time
*
* @return {@code true} if data can be applied, otherwise false
*/
default boolean cannotApplySnapshot() {
return false;
}
/** /**
* Deserialize and apply a data snapshot to this data owner * Deserialize and apply a data snapshot to this data owner
* <p> * <p>
@@ -90,9 +99,12 @@ public interface UserDataHolder extends DataHolder {
* @since 3.0 * @since 3.0
*/ */
default void applySnapshot(@NotNull DataSnapshot.Packed snapshot, @NotNull ThrowingConsumer<Boolean> runAfter) { default void applySnapshot(@NotNull DataSnapshot.Packed snapshot, @NotNull ThrowingConsumer<Boolean> runAfter) {
final HuskSync plugin = getPlugin(); if (cannotApplySnapshot()) {
return;
}
// Unpack the snapshot // Unpack the snapshot
final HuskSync plugin = getPlugin();
final DataSnapshot.Unpacked unpacked; final DataSnapshot.Unpacked unpacked;
try { try {
unpacked = snapshot.unpack(plugin); unpacked = snapshot.unpack(plugin);
@@ -104,6 +116,10 @@ public interface UserDataHolder extends DataHolder {
// Synchronously attempt to apply the snapshot // Synchronously attempt to apply the snapshot
plugin.runSync(() -> { plugin.runSync(() -> {
if (cannotApplySnapshot()) {
return;
}
try { try {
for (Map.Entry<Identifier, Data> entry : unpacked.getData().entrySet()) { for (Map.Entry<Identifier, Data> entry : unpacked.getData().entrySet()) {
final Identifier identifier = entry.getKey(); final Identifier identifier = entry.getKey();

View File

@@ -25,9 +25,7 @@ import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser; import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.Arrays; import java.util.*;
import java.util.List;
import java.util.Map;
import static net.william278.husksync.config.Settings.SynchronizationSettings.SaveOnDeathSettings; import static net.william278.husksync.config.Settings.SynchronizationSettings.SaveOnDeathSettings;
@@ -49,6 +47,7 @@ public abstract class EventListener {
* @param user The {@link OnlineUser} to handle * @param user The {@link OnlineUser} to handle
*/ */
protected final void handlePlayerJoin(@NotNull OnlineUser user) { protected final void handlePlayerJoin(@NotNull OnlineUser user) {
plugin.getDisconnectingPlayers().remove(user.getUuid());
if (user.isNpc()) { if (user.isNpc()) {
return; return;
} }
@@ -62,12 +61,18 @@ public abstract class EventListener {
* @param user The {@link OnlineUser} to handle * @param user The {@link OnlineUser} to handle
*/ */
protected final void handlePlayerQuit(@NotNull OnlineUser user) { protected final void handlePlayerQuit(@NotNull OnlineUser user) {
if (user.isNpc() || plugin.isDisabling() || plugin.isLocked(user.getUuid())) { // Check the user is a user, the plugin isn't disabling, then mark as disconnecting
if (user.isNpc() || plugin.isDisabling()) {
return; return;
} }
plugin.getDisconnectingPlayers().add(user.getUuid());
// Lock, then save their data if the user is unlocked
if (!plugin.isLocked(user.getUuid())) {
plugin.lockPlayer(user.getUuid()); plugin.lockPlayer(user.getUuid());
plugin.getDataSyncer().syncSaveUserData(user); plugin.getDataSyncer().syncSaveUserData(user);
} }
}
/** /**
* Handles the saving of data when the world save event is fired * Handles the saving of data when the world save event is fired
@@ -79,7 +84,7 @@ public abstract class EventListener {
return; return;
} }
usersInWorld.stream() usersInWorld.stream()
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc()) .filter(user -> !user.isNpc() && !user.hasDisconnected() && !plugin.isLocked(user.getUuid()))
.forEach(user -> plugin.getDataSyncer().saveCurrentUserData( .forEach(user -> plugin.getDataSyncer().saveCurrentUserData(
user, DataSnapshot.SaveCause.WORLD_SAVE user, DataSnapshot.SaveCause.WORLD_SAVE
)); ));

View File

@@ -158,7 +158,7 @@ public class RedisManager extends JedisPubSub {
final RedisMessage redisMessage = RedisMessage.fromJson(plugin, message); final RedisMessage redisMessage = RedisMessage.fromJson(plugin, message);
switch (messageType) { switch (messageType) {
case UPDATE_USER_DATA -> plugin.getOnlineUser(redisMessage.getTargetUuid()).ifPresent( case UPDATE_USER_DATA -> redisMessage.getTargetUser(plugin).ifPresent(
user -> { user -> {
plugin.lockPlayer(user.getUuid()); plugin.lockPlayer(user.getUuid());
try { try {
@@ -170,16 +170,30 @@ public class RedisManager extends JedisPubSub {
} }
} }
); );
case REQUEST_USER_DATA -> plugin.getOnlineUser(redisMessage.getTargetUuid()).ifPresent( case REQUEST_USER_DATA -> redisMessage.getTargetUser(plugin).ifPresent(
user -> RedisMessage.create( user -> RedisMessage.create(
UUID.fromString(new String(redisMessage.getPayload(), StandardCharsets.UTF_8)), UUID.fromString(new String(redisMessage.getPayload(), StandardCharsets.UTF_8)),
user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(plugin) user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(plugin)
).dispatch(plugin, RedisMessage.Type.RETURN_USER_DATA) ).dispatch(plugin, RedisMessage.Type.RETURN_USER_DATA)
); );
case CHECK_IN_PETITION -> {
if (!redisMessage.isTargetServer(plugin)) {
return;
}
final String payload = new String(redisMessage.getPayload(), StandardCharsets.UTF_8);
final User user = new User(UUID.fromString(payload.split("/")[0]), payload.split("/")[1]);
boolean online = plugin.getDisconnectingPlayers().contains(user.getUuid())
|| plugin.getOnlineUser(user.getUuid()).isEmpty();
if (!online && !plugin.isLocked(user.getUuid())) {
plugin.debug("[%s] Received check-in petition for online/unlocked user, ignoring".formatted(user.getName()));
return;
}
plugin.getRedisManager().setUserCheckedOut(user, false);
plugin.debug("[%s] Received petition for offline user, checking them in".formatted(user.getName()));
}
case RETURN_USER_DATA -> { case RETURN_USER_DATA -> {
final CompletableFuture<Optional<DataSnapshot.Packed>> future = pendingRequests.get( final UUID target = redisMessage.getTargetUuid().orElse(null);
redisMessage.getTargetUuid() final CompletableFuture<Optional<DataSnapshot.Packed>> future = pendingRequests.get(target);
);
if (future != null) { if (future != null) {
try { try {
final DataSnapshot.Packed data = DataSnapshot.deserialize(plugin, redisMessage.getPayload()); final DataSnapshot.Packed data = DataSnapshot.deserialize(plugin, redisMessage.getPayload());
@@ -188,7 +202,7 @@ public class RedisManager extends JedisPubSub {
plugin.log(Level.SEVERE, "An exception occurred returning user data from Redis", e); plugin.log(Level.SEVERE, "An exception occurred returning user data from Redis", e);
future.complete(Optional.empty()); future.complete(Optional.empty());
} }
pendingRequests.remove(redisMessage.getTargetUuid()); pendingRequests.remove(target);
} }
} }
} }
@@ -211,11 +225,17 @@ public class RedisManager extends JedisPubSub {
} }
} }
@Blocking
public void sendUserDataUpdate(@NotNull User user, @NotNull DataSnapshot.Packed data) { public void sendUserDataUpdate(@NotNull User user, @NotNull DataSnapshot.Packed data) {
plugin.runAsync(() -> {
final RedisMessage redisMessage = RedisMessage.create(user.getUuid(), data.asBytes(plugin)); final RedisMessage redisMessage = RedisMessage.create(user.getUuid(), data.asBytes(plugin));
redisMessage.dispatch(plugin, RedisMessage.Type.UPDATE_USER_DATA); redisMessage.dispatch(plugin, RedisMessage.Type.UPDATE_USER_DATA);
}); }
@Blocking
public void petitionServerCheckin(@NotNull String server, @NotNull User user) {
final RedisMessage redisMessage = RedisMessage.create(
server, "%s/%s".formatted(user.getUuid(), user.getName()).getBytes(StandardCharsets.UTF_8));
redisMessage.dispatch(plugin, RedisMessage.Type.CHECK_IN_PETITION);
} }
public CompletableFuture<Optional<DataSnapshot.Packed>> getOnlineUserData(@NotNull UUID requestId, @NotNull User user, public CompletableFuture<Optional<DataSnapshot.Packed>> getOnlineUserData(@NotNull UUID requestId, @NotNull User user,
@@ -421,7 +441,7 @@ public class RedisManager extends JedisPubSub {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
try (Jedis jedis = jedisPool.getResource()) { try (Jedis jedis = jedisPool.getResource()) {
jedis.ping(); jedis.ping();
return startTime - System.currentTimeMillis(); return System.currentTimeMillis() - startTime;
} }
} }

View File

@@ -25,25 +25,38 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import net.william278.husksync.HuskSync; import net.william278.husksync.HuskSync;
import net.william278.husksync.adapter.Adaptable; import net.william278.husksync.adapter.Adaptable;
import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Locale; import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@Setter
public class RedisMessage implements Adaptable { public class RedisMessage implements Adaptable {
private @Nullable String targetServer;
@SerializedName("target_uuid") @SerializedName("target_uuid")
private UUID targetUuid; private @Nullable UUID targetUuid;
@Getter @Getter
@Setter @Setter
@SerializedName("payload") @SerializedName("payload")
private byte[] payload; private byte[] payload;
private RedisMessage(byte[] payload) {
setPayload(payload);
}
private RedisMessage(@NotNull UUID targetUuid, byte[] message) { private RedisMessage(@NotNull UUID targetUuid, byte[] message) {
this(message);
this.setTargetUuid(targetUuid); this.setTargetUuid(targetUuid);
this.setPayload(message); }
private RedisMessage(@NotNull String targetServer, byte[] message) {
this(message);
this.setTargetServer(targetServer);
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
@@ -55,6 +68,11 @@ public class RedisMessage implements Adaptable {
return new RedisMessage(targetUuid, message); return new RedisMessage(targetUuid, message);
} }
@NotNull
public static RedisMessage create(@NotNull String targetServer, byte[] message) {
return new RedisMessage(targetServer, message);
}
@NotNull @NotNull
public static RedisMessage fromJson(@NotNull HuskSync plugin, @NotNull String json) throws JsonSyntaxException { public static RedisMessage fromJson(@NotNull HuskSync plugin, @NotNull String json) throws JsonSyntaxException {
return plugin.getGson().fromJson(json, RedisMessage.class); return plugin.getGson().fromJson(json, RedisMessage.class);
@@ -67,20 +85,23 @@ public class RedisMessage implements Adaptable {
)); ));
} }
@NotNull public Optional<UUID> getTargetUuid() {
public UUID getTargetUuid() { return Optional.ofNullable(targetUuid);
return targetUuid;
} }
public void setTargetUuid(@NotNull UUID targetUuid) { public Optional<OnlineUser> getTargetUser(@NotNull HuskSync plugin) {
this.targetUuid = targetUuid; return getTargetUuid().flatMap(plugin::getOnlineUser);
}
public boolean isTargetServer(@NotNull HuskSync plugin) {
return targetServer != null && targetServer.equals(plugin.getServerName());
} }
public enum Type { public enum Type {
UPDATE_USER_DATA, UPDATE_USER_DATA,
REQUEST_USER_DATA, REQUEST_USER_DATA,
RETURN_USER_DATA; RETURN_USER_DATA,
CHECK_IN_PETITION;
@NotNull @NotNull
public String getMessageChannel(@NotNull String clusterId) { public String getMessageChannel(@NotNull String clusterId) {

View File

@@ -185,7 +185,7 @@ public abstract class DataSyncer {
final AtomicReference<Task.Repeating> task = new AtomicReference<>(); final AtomicReference<Task.Repeating> task = new AtomicReference<>();
final AtomicBoolean processing = new AtomicBoolean(false); final AtomicBoolean processing = new AtomicBoolean(false);
final Runnable runnable = () -> { final Runnable runnable = () -> {
if (user.isOffline()) { if (user.cannotApplySnapshot()) {
task.get().cancel(); task.get().cancel();
return; return;
} }

View File

@@ -62,7 +62,10 @@ public class DelayDataSyncer extends DataSyncer {
getRedis().setUserServerSwitch(onlineUser); getRedis().setUserServerSwitch(onlineUser);
saveData( saveData(
onlineUser, onlineUser.createSnapshot(DataSnapshot.SaveCause.DISCONNECT), onlineUser, onlineUser.createSnapshot(DataSnapshot.SaveCause.DISCONNECT),
(user, data) -> getRedis().setUserData(user, data) (user, data) -> {
getRedis().setUserData(user, data);
plugin.unlockPlayer(user.getUuid());
}
); );
}); });
} }

View File

@@ -24,6 +24,8 @@ import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser; import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.Optional;
public class LockstepDataSyncer extends DataSyncer { public class LockstepDataSyncer extends DataSyncer {
public LockstepDataSyncer(@NotNull HuskSync plugin) { public LockstepDataSyncer(@NotNull HuskSync plugin) {
@@ -44,13 +46,19 @@ public class LockstepDataSyncer extends DataSyncer {
@Override @Override
public void syncApplyUserData(@NotNull OnlineUser user) { public void syncApplyUserData(@NotNull OnlineUser user) {
this.listenForRedisData(user, () -> { this.listenForRedisData(user, () -> {
if (user.isOffline()) { if (user.cannotApplySnapshot()) {
plugin.debug("Not applying data for offline user %s".formatted(user.getName())); plugin.debug("Not checking data state for user who has gone offline: %s".formatted(user.getName()));
return false; return false;
} }
if (getRedis().getUserCheckedOut(user).isPresent()) {
// If they are checked out, ask the server to check them back in and return false
final Optional<String> server = getRedis().getUserCheckedOut(user);
if (server.isPresent() && !server.get().equals(plugin.getServerName())) {
// getRedis().petitionServerCheckin(server.get(), user);
return false; return false;
} }
// If they are checked in - or checked out on *this* server - we can apply their latest data
getRedis().setUserCheckedOut(user, true); getRedis().setUserCheckedOut(user, true);
getRedis().getUserData(user).ifPresentOrElse( getRedis().getUserData(user).ifPresentOrElse(
data -> user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED), data -> user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED),
@@ -67,6 +75,7 @@ public class LockstepDataSyncer extends DataSyncer {
(user, data) -> { (user, data) -> {
getRedis().setUserData(user, data); getRedis().setUserData(user, data);
getRedis().setUserCheckedOut(user, false); getRedis().setUserCheckedOut(user, false);
plugin.unlockPlayer(user.getUuid());
} }
)); ));
} }

View File

@@ -43,11 +43,27 @@ public abstract class OnlineUser extends User implements CommandUser, UserDataHo
} }
/** /**
* Indicates if the player has gone offline * Indicates if the player is offline
* *
* @return {@code true} if the player has left the server; {@code false} otherwise * @return {@code true} if the player has left the server; {@code false} otherwise
* @deprecated use {@code hasDisconnected} instead
*/ */
public abstract boolean isOffline(); @Deprecated(since = "3.8")
public boolean isOffline() {
return hasDisconnected();
}
public abstract boolean hasDisconnected();
// Users cannot have snapshots applied if they have disconnected!
@Override
public boolean cannotApplySnapshot() {
if (hasDisconnected()) {
getPlugin().debug("[%s] Cannot apply snapshot as user is offline!".formatted(getName()));
return true;
}
return false;
}
@NotNull @NotNull
@Override @Override
@@ -117,7 +133,7 @@ public abstract class OnlineUser extends User implements CommandUser, UserDataHo
/** /**
* Set a player's status from a {@link DataSnapshot} * Apply a {@link DataSnapshot} to a player, updating their data
* *
* @param snapshot The {@link DataSnapshot} to set the player's status from * @param snapshot The {@link DataSnapshot} to set the player's status from
* @param cause The {@link DataSnapshot.UpdateCause} of the snapshot * @param cause The {@link DataSnapshot.UpdateCause} of the snapshot
@@ -125,14 +141,12 @@ public abstract class OnlineUser extends User implements CommandUser, UserDataHo
*/ */
public void applySnapshot(@NotNull DataSnapshot.Packed snapshot, @NotNull DataSnapshot.UpdateCause cause) { public void applySnapshot(@NotNull DataSnapshot.Packed snapshot, @NotNull DataSnapshot.UpdateCause cause) {
getPlugin().fireEvent(getPlugin().getPreSyncEvent(this, snapshot), (event) -> { getPlugin().fireEvent(getPlugin().getPreSyncEvent(this, snapshot), (event) -> {
if (!isOffline()) { getPlugin().debug(String.format("Attempting to apply snapshot (%s) to %s (cause: %s)",
getPlugin().debug(String.format("Applying snapshot (%s) to %s (cause: %s)",
snapshot.getShortId(), getName(), cause.getDisplayName() snapshot.getShortId(), getName(), cause.getDisplayName()
)); ));
UserDataHolder.super.applySnapshot( UserDataHolder.super.applySnapshot(
event.getData(), (succeeded) -> completeSync(succeeded, cause, getPlugin()) event.getData(), (succeeded) -> completeSync(succeeded, cause, getPlugin())
); );
}
}); });
} }

View File

@@ -54,7 +54,6 @@ import net.william278.husksync.database.PostgresDatabase;
import net.william278.husksync.event.FabricEventDispatcher; import net.william278.husksync.event.FabricEventDispatcher;
import net.william278.husksync.event.ModLoadedCallback; import net.william278.husksync.event.ModLoadedCallback;
import net.william278.husksync.hook.PlanHook; import net.william278.husksync.hook.PlanHook;
import net.william278.husksync.listener.EventListener;
import net.william278.husksync.listener.FabricEventListener; import net.william278.husksync.listener.FabricEventListener;
import net.william278.husksync.listener.LockedHandler; import net.william278.husksync.listener.LockedHandler;
import net.william278.husksync.migrator.Migrator; import net.william278.husksync.migrator.Migrator;
@@ -109,6 +108,7 @@ public class FabricHuskSync implements DedicatedServerModInitializer, HuskSync,
private final Map<String, Boolean> permissions = Maps.newHashMap(); private final Map<String, Boolean> permissions = Maps.newHashMap();
private final List<Migrator> availableMigrators = Lists.newArrayList(); private final List<Migrator> availableMigrators = Lists.newArrayList();
private final Set<UUID> lockedPlayers = Sets.newConcurrentHashSet(); private final Set<UUID> lockedPlayers = Sets.newConcurrentHashSet();
private final Set<UUID> disconnectingPlayers = Sets.newConcurrentHashSet();
private final Map<UUID, FabricUser> playerMap = Maps.newConcurrentMap(); private final Map<UUID, FabricUser> playerMap = Maps.newConcurrentMap();
private Logger logger; private Logger logger;

View File

@@ -34,7 +34,10 @@ public interface FabricUserDataHolder extends UserDataHolder {
@Override @Override
default Optional<? extends Data> getData(@NotNull Identifier id) { default Optional<? extends Data> getData(@NotNull Identifier id) {
if (!id.isCustom()) { if (id.isCustom()) {
return Optional.ofNullable(getCustomDataStore().get(id));
}
try { try {
return switch (id.getKeyValue()) { return switch (id.getKeyValue()) {
case "inventory" -> getInventory(); case "inventory" -> getInventory();
@@ -53,11 +56,10 @@ public interface FabricUserDataHolder extends UserDataHolder {
default -> throw new IllegalStateException(String.format("Unexpected data type: %s", id)); default -> throw new IllegalStateException(String.format("Unexpected data type: %s", id));
}; };
} catch (Throwable e) { } catch (Throwable e) {
getPlugin().debug("Failed to get data for key: " + id.getKeyValue(), e); getPlugin().debug("Failed to get data for key: " + id.asMinimalString(), e);
return Optional.empty();
} }
} }
return Optional.ofNullable(getCustomDataStore().get(id));
}
@Override @Override
default void setData(@NotNull Identifier id, @NotNull Data data) { default void setData(@NotNull Identifier id, @NotNull Data data) {

View File

@@ -64,8 +64,9 @@ public class FabricUser extends OnlineUser implements FabricUserDataHolder {
} }
@Override @Override
public boolean isOffline() { public boolean hasDisconnected() {
return player == null || player.isDisconnected(); return getPlugin().getDisconnectingPlayers().contains(getUuid())
|| player == null || player.isDisconnected();
} }
@NotNull @NotNull