9
0
mirror of https://github.com/WiIIiam278/HuskSync.git synced 2025-12-25 01:29:19 +00:00

Introduce new lockstep syncing system, modularize sync modes (#178)

* Start work on modular sync systems

* Add experimental lockstep sync system, close #69

* Refactor RedisMessageType enum

* Fixup lockstep syncing

* Bump to 3.1

* Update docs with details about the new Sync Modes

* Sync mode config key is `mode` instead of `type`

* Add server to data snapshot overview

* API: Add API for setting data syncers

* Fixup weird statistic matching logic
This commit is contained in:
William
2023-10-05 18:05:02 +01:00
committed by GitHub
parent 03ca335293
commit cae17f6e68
37 changed files with 777 additions and 226 deletions

View File

@@ -28,6 +28,7 @@ import net.william278.desertwell.util.UpdateChecker;
import net.william278.desertwell.util.Version;
import net.william278.husksync.adapter.DataAdapter;
import net.william278.husksync.config.Locales;
import net.william278.husksync.config.Server;
import net.william278.husksync.config.Settings;
import net.william278.husksync.data.Data;
import net.william278.husksync.data.Identifier;
@@ -36,6 +37,7 @@ import net.william278.husksync.database.Database;
import net.william278.husksync.event.EventDispatcher;
import net.william278.husksync.migrator.Migrator;
import net.william278.husksync.redis.RedisManager;
import net.william278.husksync.sync.DataSyncer;
import net.william278.husksync.user.ConsoleUser;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.util.LegacyConverter;
@@ -90,6 +92,11 @@ public interface HuskSync extends Task.Supplier, EventDispatcher {
@NotNull
RedisManager getRedisManager();
/**
* Returns the implementing adapter for serializing data
*
* @return the {@link DataAdapter}
*/
@NotNull
DataAdapter getDataAdapter();
@@ -130,6 +137,21 @@ public interface HuskSync extends Task.Supplier, EventDispatcher {
return getSerializers().keySet();
}
/**
* Returns the data syncer implementation
*
* @return the {@link DataSyncer} implementation
*/
@NotNull
DataSyncer getDataSyncer();
/**
* Set the data syncer implementation
*
* @param dataSyncer the {@link DataSyncer} implementation
*/
void setDataSyncer(@NotNull DataSyncer dataSyncer);
/**
* Returns a list of available data {@link Migrator}s
*
@@ -167,6 +189,11 @@ public interface HuskSync extends Task.Supplier, EventDispatcher {
void setSettings(@NotNull Settings settings);
@NotNull
String getServerName();
void setServer(@NotNull Server server);
/**
* Returns the plugin {@link Locales}
*
@@ -255,7 +282,7 @@ public interface HuskSync extends Task.Supplier, EventDispatcher {
String getPlatformType();
/**
* Returns the legacy data converter, if it exists
* Returns the legacy data converter if it exists
*
* @return the {@link LegacyConverter}
*/
@@ -269,6 +296,9 @@ public interface HuskSync extends Task.Supplier, EventDispatcher {
// Load settings
setSettings(Annotaml.create(new File(getDataFolder(), "config.yml"), Settings.class).get());
// Load server name
setServer(Annotaml.create(new File(getDataFolder(), "server.yml"), Server.class).get());
// Load locales from language preset default
final Locales languagePresets = Annotaml.create(
Locales.class,
@@ -305,12 +335,31 @@ public interface HuskSync extends Task.Supplier, EventDispatcher {
}
}
/**
* Get the set of UUIDs of "locked players", for which events will be canceled.
* </p>
* Players are locked while their items are being set (on join) or saved (on quit)
*/
@NotNull
Set<UUID> getLockedPlayers();
default boolean isLocked(@NotNull UUID uuid) {
return getLockedPlayers().contains(uuid);
}
default void lockPlayer(@NotNull UUID uuid) {
getLockedPlayers().add(uuid);
}
default void unlockPlayer(@NotNull UUID uuid) {
getLockedPlayers().remove(uuid);
}
@NotNull
Gson getGson();
boolean isDisabling();
@NotNull
default Gson createGson() {
return Converters.registerOffsetDateTime(new GsonBuilder()).create();

View File

@@ -26,6 +26,7 @@ import net.william278.husksync.data.Data;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.data.Identifier;
import net.william278.husksync.data.Serializer;
import net.william278.husksync.sync.DataSyncer;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.user.User;
import org.jetbrains.annotations.ApiStatus;
@@ -404,6 +405,7 @@ public abstract class HuskSyncAPI {
* @param <T> The type of the element
* @return The deserialized element
* @throws Serializer.DeserializationException If the element could not be deserialized
* @since 3.0
*/
@NotNull
public <T extends Adaptable> T deserializeData(@NotNull String serialized, Class<T> type)
@@ -418,6 +420,7 @@ public abstract class HuskSyncAPI {
* @param <T> The type of the element
* @return The serialized JSON string
* @throws Serializer.SerializationException If the element could not be serialized
* @since 3.0
*/
@NotNull
public <T extends Adaptable> String serializeData(@NotNull T element)
@@ -425,6 +428,16 @@ public abstract class HuskSyncAPI {
return plugin.getDataAdapter().toJson(element);
}
/**
* Set the {@link DataSyncer} to be used to sync data
*
* @param syncer The data syncer to use for synchronizing user data
* @since 3.1
*/
public void setDataSyncer(@NotNull DataSyncer syncer) {
plugin.setDataSyncer(syncer);
}
/**
* <b>(Internal use only)</b> - Get the plugin instance
*

View File

@@ -0,0 +1,77 @@
/*
* This file is part of HuskSync, licensed under the Apache License 2.0.
*
* Copyright (c) William278 <will27528@gmail.com>
* Copyright (c) contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.william278.husksync.config;
import net.william278.annotaml.YamlFile;
import net.william278.annotaml.YamlKey;
import org.jetbrains.annotations.NotNull;
import java.nio.file.Path;
/**
* Represents a server on a proxied network.
*/
@YamlFile(header = """
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ HuskSync Server ID config ┃
┃ Developed by William278 ┃
┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
┣╸ This file should contain the ID of this server as defined in your proxy config.
┗╸ If you join it using /server alpha, then set it to 'alpha' (case-sensitive)""")
public class Server {
/**
* Default server identifier.
*/
@NotNull
public static String getDefaultServerName() {
try {
final Path serverDirectory = Path.of(System.getProperty("user.dir"));
return serverDirectory.getFileName().toString().trim();
} catch (Exception e) {
return "server";
}
}
@YamlKey("name")
private String serverName = getDefaultServerName();
@SuppressWarnings("unused")
private Server() {
}
@Override
public boolean equals(@NotNull Object other) {
// If the name of this server matches another, the servers are the same.
if (other instanceof Server server) {
return server.getName().equalsIgnoreCase(this.getName());
}
return super.equals(other);
}
/**
* Proxy-defined name of this server.
*/
@NotNull
public String getName() {
return serverName;
}
}

View File

@@ -26,6 +26,7 @@ import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.data.Identifier;
import net.william278.husksync.database.Database;
import net.william278.husksync.listener.EventListener;
import net.william278.husksync.sync.DataSyncer;
import org.jetbrains.annotations.NotNull;
import java.util.*;
@@ -44,7 +45,7 @@ import java.util.*;
public class Settings {
// Top-level settings
@YamlComment("Locale of the default language file to use. Docs: https://william278.net/docs/huskhomes/translations")
@YamlComment("Locale of the default language file to use. Docs: https://william278.net/docs/husksync/translations")
@YamlKey("language")
private String language = "en-gb";
@@ -135,6 +136,11 @@ public class Settings {
// Synchronization settings
@YamlComment("The mode of data synchronization to use (DELAY or LOCKSTEP). DELAY should be fine for most networks."
+ " Docs: https://william278.net/docs/husksync/sync-modes")
@YamlKey("synchronization.mode")
private DataSyncer.Mode syncMode = DataSyncer.Mode.DELAY;
@YamlComment("The number of data snapshot backups that should be kept at once per user")
@YamlKey("synchronization.max_user_data_snapshots")
private int maxUserDataSnapshots = 16;
@@ -150,7 +156,6 @@ public class Settings {
DataSnapshot.SaveCause.INVENTORY_COMMAND.name(),
DataSnapshot.SaveCause.ENDERCHEST_COMMAND.name(),
DataSnapshot.SaveCause.BACKUP_RESTORE.name(),
DataSnapshot.SaveCause.CONVERTED_FROM_V2.name(),
DataSnapshot.SaveCause.LEGACY_MIGRATION.name(),
DataSnapshot.SaveCause.MPDB_MIGRATION.name()
);
@@ -188,7 +193,7 @@ public class Settings {
@YamlKey("synchronization.synchronize_dead_players_changing_server")
private boolean synchronizeDeadPlayersChangingServer = true;
@YamlComment("How long, in milliseconds, this server should wait for a response from the redis server before "
@YamlComment("If using the DELAY sync method, how long should this server listen for Redis key data updates before "
+ "pulling data from the database instead (i.e., if the user did not change servers).")
@YamlKey("synchronization.network_latency_milliseconds")
private int networkLatencyMilliseconds = 500;
@@ -315,6 +320,11 @@ public class Settings {
return redisUseSsl;
}
@NotNull
public DataSyncer.Mode getSyncMode() {
return syncMode;
}
public int getMaxUserDataSnapshots() {
return maxUserDataSnapshots;
}

View File

@@ -45,9 +45,9 @@ public class DataSnapshot {
/*
* Current version of the snapshot data format.
* HuskSync v3.0 uses v4; HuskSync v2.0 uses v1-v3
* HuskSync v3.1 uses v5, v3.0 uses v4; v2.0 uses v1-v3
*/
protected static final int CURRENT_FORMAT_VERSION = 4;
protected static final int CURRENT_FORMAT_VERSION = 5;
@SerializedName("id")
protected UUID id;
@@ -61,6 +61,9 @@ public class DataSnapshot {
@SerializedName("save_cause")
protected SaveCause saveCause;
@SerializedName("server_name")
protected String serverName;
@SerializedName("minecraft_version")
protected String minecraftVersion;
@@ -74,12 +77,13 @@ public class DataSnapshot {
protected Map<String, String> data;
private DataSnapshot(@NotNull UUID id, boolean pinned, @NotNull OffsetDateTime timestamp,
@NotNull SaveCause saveCause, @NotNull Map<String, String> data,
@NotNull SaveCause saveCause, @NotNull String serverName, @NotNull Map<String, String> data,
@NotNull Version minecraftVersion, @NotNull String platformType, int formatVersion) {
this.id = id;
this.pinned = pinned;
this.timestamp = timestamp;
this.saveCause = saveCause;
this.serverName = serverName;
this.data = data;
this.minecraftVersion = minecraftVersion.toStringWithoutMetadata();
this.platformType = platformType;
@@ -114,7 +118,7 @@ public class DataSnapshot {
"Please ensure each server is running the latest version of HuskSync.",
snapshot.getFormatVersion(), CURRENT_FORMAT_VERSION));
}
if (snapshot.getFormatVersion() < CURRENT_FORMAT_VERSION) {
if (snapshot.getFormatVersion() < 4) {
if (plugin.getLegacyConverter().isPresent()) {
return plugin.getLegacyConverter().get().convert(
data,
@@ -195,13 +199,26 @@ public class DataSnapshot {
return saveCause;
}
/**
* Get the server the snapshot was created on.
* <p>
* Note that snapshots generated before v3.1 will return {@code "N/A"}
*
* @return The server name
* @since 3.1
*/
@NotNull
public String getServerName() {
return Optional.ofNullable(serverName).orElse("N/A");
}
/**
* Set why the snapshot was created
*
* @param saveCause The {@link SaveCause data save cause} of the snapshot
* @since 3.0
*/
public void setSaveCause(SaveCause saveCause) {
public void setSaveCause(@NotNull SaveCause saveCause) {
this.saveCause = saveCause;
}
@@ -256,9 +273,9 @@ public class DataSnapshot {
public static class Packed extends DataSnapshot implements Adaptable {
protected Packed(@NotNull UUID id, boolean pinned, @NotNull OffsetDateTime timestamp,
@NotNull SaveCause saveCause, @NotNull Map<String, String> data,
@NotNull SaveCause saveCause, @NotNull String serverName, @NotNull Map<String, String> data,
@NotNull Version minecraftVersion, @NotNull String platformType, int formatVersion) {
super(id, pinned, timestamp, saveCause, data, minecraftVersion, platformType, formatVersion);
super(id, pinned, timestamp, saveCause, serverName, data, minecraftVersion, platformType, formatVersion);
}
@SuppressWarnings("unused")
@@ -282,8 +299,8 @@ public class DataSnapshot {
@NotNull
public Packed copy() {
return new Packed(
UUID.randomUUID(), pinned, OffsetDateTime.now(), saveCause, data,
getMinecraftVersion(), platformType, formatVersion
UUID.randomUUID(), pinned, OffsetDateTime.now(), saveCause, serverName,
data, getMinecraftVersion(), platformType, formatVersion
);
}
@@ -307,7 +324,7 @@ public class DataSnapshot {
@NotNull
public DataSnapshot.Unpacked unpack(@NotNull HuskSync plugin) {
return new Unpacked(
id, pinned, timestamp, saveCause, data,
id, pinned, timestamp, saveCause, serverName, data,
getMinecraftVersion(), platformType, formatVersion, plugin
);
}
@@ -325,17 +342,17 @@ public class DataSnapshot {
private final Map<Identifier, Data> deserialized;
private Unpacked(@NotNull UUID id, boolean pinned, @NotNull OffsetDateTime timestamp,
@NotNull SaveCause saveCause, @NotNull Map<String, String> data,
@NotNull SaveCause saveCause, @NotNull String serverName, @NotNull Map<String, String> data,
@NotNull Version minecraftVersion, @NotNull String platformType, int formatVersion,
@NotNull HuskSync plugin) {
super(id, pinned, timestamp, saveCause, data, minecraftVersion, platformType, formatVersion);
super(id, pinned, timestamp, saveCause, serverName, data, minecraftVersion, platformType, formatVersion);
this.deserialized = deserializeData(plugin);
}
private Unpacked(@NotNull UUID id, boolean pinned, @NotNull OffsetDateTime timestamp,
@NotNull SaveCause saveCause, @NotNull Map<Identifier, Data> data,
@NotNull SaveCause saveCause, @NotNull String serverName, @NotNull Map<Identifier, Data> data,
@NotNull Version minecraftVersion, @NotNull String platformType, int formatVersion) {
super(id, pinned, timestamp, saveCause, Map.of(), minecraftVersion, platformType, formatVersion);
super(id, pinned, timestamp, saveCause, serverName, Map.of(), minecraftVersion, platformType, formatVersion);
this.deserialized = data;
}
@@ -384,7 +401,7 @@ public class DataSnapshot {
@ApiStatus.Internal
public DataSnapshot.Packed pack(@NotNull HuskSync plugin) {
return new DataSnapshot.Packed(
id, pinned, timestamp, saveCause, serializeData(plugin),
id, pinned, timestamp, saveCause, serverName, serializeData(plugin),
getMinecraftVersion(), platformType, formatVersion
);
}
@@ -402,6 +419,7 @@ public class DataSnapshot {
private final HuskSync plugin;
private UUID id;
private SaveCause saveCause;
private String serverName;
private boolean pinned;
private OffsetDateTime timestamp;
private final Map<Identifier, Data> data;
@@ -412,6 +430,7 @@ public class DataSnapshot {
this.data = new HashMap<>();
this.timestamp = OffsetDateTime.now();
this.id = UUID.randomUUID();
this.serverName = plugin.getServerName();
}
/**
@@ -441,6 +460,19 @@ public class DataSnapshot {
return this;
}
/**
* Set the name of the server where this snapshot was created
*
* @param serverName The server name
* @return The builder
* @since 3.1
*/
@NotNull
public Builder serverName(@NotNull String serverName) {
this.serverName = serverName;
return this;
}
/**
* Set whether the data should be pinned
*
@@ -686,6 +718,7 @@ public class DataSnapshot {
pinned || plugin.getSettings().doAutoPin(saveCause),
timestamp,
saveCause,
serverName,
data,
plugin.getMinecraftVersion(),
plugin.getPlatformType(),

View File

@@ -23,13 +23,12 @@ import net.william278.husksync.HuskSync;
import net.william278.husksync.data.Data;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.util.Task;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* Handles what should happen when events are fired
@@ -39,22 +38,8 @@ public abstract class EventListener {
// The plugin instance
protected final HuskSync plugin;
/**
* Set of UUIDs of "locked players", for which events will be canceled.
* </p>
* Players are locked while their items are being set (on join) or saved (on quit)
*/
private final Set<UUID> lockedPlayers;
/**
* Whether the plugin is currently being disabled
*/
private boolean disabling;
protected EventListener(@NotNull HuskSync plugin) {
this.plugin = plugin;
this.lockedPlayers = new HashSet<>();
this.disabling = false;
}
/**
@@ -66,51 +51,8 @@ public abstract class EventListener {
if (user.isNpc()) {
return;
}
lockedPlayers.add(user.getUuid());
plugin.runAsyncDelayed(() -> {
// Fetch from the database if the user isn't changing servers
if (!plugin.getRedisManager().getUserServerSwitch(user)) {
this.setUserFromDatabase(user);
return;
}
// Set the user as soon as the source server has set the data to redis
final long MAX_ATTEMPTS = 16L;
final AtomicLong timesRun = new AtomicLong(0L);
final AtomicReference<Task.Repeating> task = new AtomicReference<>();
final Runnable runnable = () -> {
if (user.isOffline()) {
task.get().cancel();
return;
}
if (disabling || timesRun.getAndIncrement() > MAX_ATTEMPTS) {
task.get().cancel();
this.setUserFromDatabase(user);
return;
}
plugin.getRedisManager().getUserData(user).ifPresent(redisData -> {
task.get().cancel();
user.applySnapshot(redisData, DataSnapshot.UpdateCause.SYNCHRONIZED);
});
};
task.set(plugin.getRepeatingTask(runnable, 10));
task.get().run();
}, Math.max(0, plugin.getSettings().getNetworkLatencyMilliseconds() / 50L));
}
/**
* Set a user's data from the database
*
* @param user The user to set the data for
*/
private void setUserFromDatabase(@NotNull OnlineUser user) {
plugin.getDatabase().getLatestSnapshot(user).ifPresentOrElse(
snapshot -> user.applySnapshot(snapshot, DataSnapshot.UpdateCause.SYNCHRONIZED),
() -> user.completeSync(true, DataSnapshot.UpdateCause.NEW_USER, plugin)
);
plugin.lockPlayer(user.getUuid());
plugin.getDataSyncer().setUserData(user);
}
/**
@@ -119,27 +61,11 @@ public abstract class EventListener {
* @param user The {@link OnlineUser} to handle
*/
protected final void handlePlayerQuit(@NotNull OnlineUser user) {
// Players quitting have their data manually saved when the plugin is disabled
if (disabling) {
if (user.isNpc() || plugin.isDisabling() || plugin.isLocked(user.getUuid())) {
return;
}
// Don't sync players awaiting synchronization
if (lockedPlayers.contains(user.getUuid()) || user.isNpc()) {
return;
}
// Handle disconnection
try {
lockedPlayers.add(user.getUuid());
plugin.getRedisManager().setUserServerSwitch(user).thenRun(() -> {
final DataSnapshot.Packed data = user.createSnapshot(DataSnapshot.SaveCause.DISCONNECT);
plugin.getRedisManager().setUserData(user, data);
plugin.getDatabase().addSnapshot(user, data);
});
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred handling a player disconnection", e);
}
plugin.lockPlayer(user.getUuid());
plugin.runAsync(() -> plugin.getDataSyncer().saveUserData(user));
}
/**
@@ -148,11 +74,11 @@ public abstract class EventListener {
* @param usersInWorld a list of users in the world that is being saved
*/
protected final void saveOnWorldSave(@NotNull List<OnlineUser> usersInWorld) {
if (disabling || !plugin.getSettings().doSaveOnWorldSave()) {
if (plugin.isDisabling() || !plugin.getSettings().doSaveOnWorldSave()) {
return;
}
usersInWorld.stream()
.filter(user -> !lockedPlayers.contains(user.getUuid()) && !user.isNpc())
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc())
.forEach(user -> plugin.getDatabase().addSnapshot(
user, user.createSnapshot(DataSnapshot.SaveCause.WORLD_SAVE)
));
@@ -165,8 +91,8 @@ public abstract class EventListener {
* @param drops The items that this user would have dropped
*/
protected void saveOnPlayerDeath(@NotNull OnlineUser user, @NotNull Data.Items drops) {
if (disabling || !plugin.getSettings().doSaveOnDeath() || lockedPlayers.contains(user.getUuid()) || user.isNpc()
|| (!plugin.getSettings().doSaveEmptyDropsOnDeath() && drops.isEmpty())) {
if (plugin.isDisabling() || !plugin.getSettings().doSaveOnDeath() || plugin.isLocked(user.getUuid())
|| user.isNpc() || (!plugin.getSettings().doSaveEmptyDropsOnDeath() && drops.isEmpty())) {
return;
}
@@ -182,20 +108,18 @@ public abstract class EventListener {
* @return Whether the event should be canceled
*/
protected final boolean cancelPlayerEvent(@NotNull UUID userUuid) {
return disabling || lockedPlayers.contains(userUuid);
return plugin.isDisabling() || plugin.isLocked(userUuid);
}
/**
* Handle the plugin disabling
*/
public final void handlePluginDisable() {
disabling = true;
// Save data for all online users
// Save for all online players
plugin.getOnlineUsers().stream()
.filter(user -> !lockedPlayers.contains(user.getUuid()) && !user.isNpc())
.filter(user -> !plugin.isLocked(user.getUuid()) && !user.isNpc())
.forEach(user -> {
lockedPlayers.add(user.getUuid());
plugin.lockPlayer(user.getUuid());
plugin.getDatabase().addSnapshot(user, user.createSnapshot(DataSnapshot.SaveCause.SERVER_SHUTDOWN));
});
@@ -204,10 +128,6 @@ public abstract class EventListener {
plugin.getRedisManager().terminate();
}
public final Set<UUID> getLockedPlayers() {
return this.lockedPlayers;
}
/**
* Represents priorities for events that HuskSync listens to
*/

View File

@@ -24,9 +24,9 @@ import org.jetbrains.annotations.NotNull;
import java.util.Locale;
public enum RedisKeyType {
CACHE(60 * 60 * 24),
DATA_UPDATE(10),
SERVER_SWITCH(10);
SERVER_SWITCH(10),
DATA_CHECKOUT(60 * 60 * 24 * 7 * 52);
private final int timeToLive;

View File

@@ -92,7 +92,7 @@ public class RedisManager extends JedisPubSub {
try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(
this,
Arrays.stream(RedisMessageType.values())
Arrays.stream(RedisMessage.Type.values())
.map(type -> type.getMessageChannel(clusterId))
.toArray(String[]::new)
);
@@ -101,7 +101,7 @@ public class RedisManager extends JedisPubSub {
@Override
public void onMessage(@NotNull String channel, @NotNull String message) {
final RedisMessageType messageType = RedisMessageType.getTypeFromChannel(channel, clusterId).orElse(null);
final RedisMessage.Type messageType = RedisMessage.Type.getTypeFromChannel(channel, clusterId).orElse(null);
if (messageType == null) {
return;
}
@@ -118,7 +118,7 @@ public class RedisManager extends JedisPubSub {
user -> RedisMessage.create(
UUID.fromString(new String(redisMessage.getPayload(), StandardCharsets.UTF_8)),
user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(plugin)
).dispatch(plugin, RedisMessageType.RETURN_USER_DATA)
).dispatch(plugin, RedisMessage.Type.RETURN_USER_DATA)
);
case RETURN_USER_DATA -> {
final CompletableFuture<Optional<DataSnapshot.Packed>> future = pendingRequests.get(
@@ -142,7 +142,7 @@ public class RedisManager extends JedisPubSub {
public void sendUserDataUpdate(@NotNull User user, @NotNull DataSnapshot.Packed data) {
plugin.runAsync(() -> {
final RedisMessage redisMessage = RedisMessage.create(user.getUuid(), data.asBytes(plugin));
redisMessage.dispatch(plugin, RedisMessageType.UPDATE_USER_DATA);
redisMessage.dispatch(plugin, RedisMessage.Type.UPDATE_USER_DATA);
});
}
@@ -162,7 +162,7 @@ public class RedisManager extends JedisPubSub {
user.getUuid(),
requestId.toString().getBytes(StandardCharsets.UTF_8)
);
redisMessage.dispatch(plugin, RedisMessageType.REQUEST_USER_DATA);
redisMessage.dispatch(plugin, RedisMessage.Type.REQUEST_USER_DATA);
});
return future.orTimeout(
plugin.getSettings().getNetworkLatencyMilliseconds(),
@@ -180,44 +180,96 @@ public class RedisManager extends JedisPubSub {
* @param user the user to set data for
* @param data the user's data to set
*/
@Blocking
public void setUserData(@NotNull User user, @NotNull DataSnapshot.Packed data) {
plugin.runAsync(() -> {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.DATA_UPDATE, user.getUuid(), clusterId),
RedisKeyType.DATA_UPDATE.getTimeToLive(),
data.asBytes(plugin)
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.DATA_UPDATE, user.getUuid(), clusterId),
RedisKeyType.DATA_UPDATE.getTimeToLive(),
data.asBytes(plugin)
);
plugin.debug(String.format("[%s] Set %s key to redis at: %s", user.getUsername(),
RedisKeyType.DATA_UPDATE.name(), new SimpleDateFormat("mm:ss.SSS").format(new Date())));
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred setting a user's server switch", e);
}
}
@Blocking
public void setUserCheckedOut(@NotNull User user, boolean checkedOut) {
try (Jedis jedis = jedisPool.getResource()) {
if (checkedOut) {
jedis.set(
getKey(RedisKeyType.DATA_CHECKOUT, user.getUuid(), clusterId),
plugin.getServerName().getBytes(StandardCharsets.UTF_8)
);
plugin.debug(String.format("[%s] Set %s key to redis at: %s", user.getUsername(),
RedisKeyType.DATA_UPDATE.name(), new SimpleDateFormat("mm:ss.SSS").format(new Date())));
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred setting a user's server switch", e);
} else {
jedis.del(getKey(RedisKeyType.DATA_CHECKOUT, user.getUuid(), clusterId));
}
});
plugin.debug(String.format("[%s] %s %s key to redis at: %s",
checkedOut ? "set" : "removed", user.getUsername(), RedisKeyType.DATA_CHECKOUT.name(),
new SimpleDateFormat("mm:ss.SSS").format(new Date())));
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred setting a user's server switch", e);
}
}
@Blocking
public Optional<String> getUserCheckedOut(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
final byte[] key = getKey(RedisKeyType.DATA_CHECKOUT, user.getUuid(), clusterId);
final byte[] readData = jedis.get(key);
if (readData != null) {
plugin.debug("[" + user.getUsername() + "] Successfully read "
+ RedisKeyType.DATA_CHECKOUT.name() + " key from redis at: " +
new SimpleDateFormat("mm:ss.SSS").format(new Date()));
return Optional.of(new String(readData, StandardCharsets.UTF_8));
}
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred fetching a user's checkout key from redis", e);
}
plugin.debug("[" + user.getUsername() + "] Could not read " +
RedisKeyType.DATA_CHECKOUT.name() + " key from redis at: " +
new SimpleDateFormat("mm:ss.SSS").format(new Date()));
return Optional.empty();
}
@Blocking
public void clearUsersCheckedOutOnServer() {
final String keyFormat = String.format("%s*", RedisKeyType.DATA_CHECKOUT.getKeyPrefix(clusterId));
try (Jedis jedis = jedisPool.getResource()) {
final Set<String> keys = jedis.keys(keyFormat);
if (keys == null) {
plugin.log(Level.WARNING, "Checkout key set returned null from jedis during clearing");
return;
}
for (String key : keys) {
if (jedis.get(key).equals(plugin.getServerName())) {
jedis.del(key);
}
}
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred clearing users checked out on this server", e);
}
}
/**
* Set a user's server switch to the Redis server
*
* @param user the user to set the server switch for
* @return a future returning void when complete
*/
public CompletableFuture<Void> setUserServerSwitch(@NotNull User user) {
final CompletableFuture<Void> future = new CompletableFuture<>();
plugin.runAsync(() -> {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.SERVER_SWITCH, user.getUuid(), clusterId),
RedisKeyType.SERVER_SWITCH.getTimeToLive(), new byte[0]
);
future.complete(null);
plugin.debug(String.format("[%s] Set %s key to redis at: %s", user.getUsername(),
RedisKeyType.SERVER_SWITCH.name(), new SimpleDateFormat("mm:ss.SSS").format(new Date())));
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred setting a user's server switch", e);
}
});
return future;
@Blocking
public void setUserServerSwitch(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.SERVER_SWITCH, user.getUuid(), clusterId),
RedisKeyType.SERVER_SWITCH.getTimeToLive(), new byte[0]
);
plugin.debug(String.format("[%s] Set %s key to redis at: %s", user.getUsername(),
RedisKeyType.SERVER_SWITCH.name(), new SimpleDateFormat("mm:ss.SSS").format(new Date())));
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred setting a user's server switch", e);
}
}
/**
@@ -226,6 +278,7 @@ public class RedisManager extends JedisPubSub {
* @param user The user to fetch data for
* @return The user's data, if it's present on the database. Otherwise, an empty optional.
*/
@Blocking
public Optional<DataSnapshot.Packed> getUserData(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
final byte[] key = getKey(RedisKeyType.DATA_UPDATE, user.getUuid(), clusterId);
@@ -251,6 +304,7 @@ public class RedisManager extends JedisPubSub {
}
}
@Blocking
public boolean getUserServerSwitch(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
final byte[] key = getKey(RedisKeyType.SERVER_SWITCH, user.getUuid(), clusterId);
@@ -274,6 +328,7 @@ public class RedisManager extends JedisPubSub {
}
}
@Blocking
public void terminate() {
if (jedisPool != null) {
if (!jedisPool.isClosed()) {

View File

@@ -25,6 +25,9 @@ import net.william278.husksync.HuskSync;
import net.william278.husksync.adapter.Adaptable;
import org.jetbrains.annotations.NotNull;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
public class RedisMessage implements Adaptable {
@@ -53,7 +56,7 @@ public class RedisMessage implements Adaptable {
return plugin.getGson().fromJson(json, RedisMessage.class);
}
public void dispatch(@NotNull HuskSync plugin, @NotNull RedisMessageType type) {
public void dispatch(@NotNull HuskSync plugin, @NotNull Type type) {
plugin.runAsync(() -> plugin.getRedisManager().sendMessage(
type.getMessageChannel(plugin.getSettings().getClusterId()),
plugin.getGson().toJson(this)
@@ -77,4 +80,27 @@ public class RedisMessage implements Adaptable {
this.payload = payload;
}
public enum Type {
UPDATE_USER_DATA,
REQUEST_USER_DATA,
RETURN_USER_DATA;
@NotNull
public String getMessageChannel(@NotNull String clusterId) {
return String.format(
"%s:%s:%s",
RedisManager.KEY_NAMESPACE.toLowerCase(Locale.ENGLISH),
clusterId.toLowerCase(Locale.ENGLISH),
name().toLowerCase(Locale.ENGLISH)
);
}
public static Optional<Type> getTypeFromChannel(@NotNull String channel, @NotNull String clusterId) {
return Arrays.stream(values())
.filter(messageType -> messageType.getMessageChannel(clusterId).equalsIgnoreCase(channel))
.findFirst();
}
}
}

View File

@@ -1,50 +0,0 @@
/*
* This file is part of HuskSync, licensed under the Apache License 2.0.
*
* Copyright (c) William278 <will27528@gmail.com>
* Copyright (c) contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.william278.husksync.redis;
import org.jetbrains.annotations.NotNull;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
public enum RedisMessageType {
UPDATE_USER_DATA,
REQUEST_USER_DATA,
RETURN_USER_DATA;
@NotNull
public String getMessageChannel(@NotNull String clusterId) {
return String.format(
"%s:%s:%s",
RedisManager.KEY_NAMESPACE.toLowerCase(Locale.ENGLISH),
clusterId.toLowerCase(Locale.ENGLISH),
name().toLowerCase(Locale.ENGLISH)
);
}
public static Optional<RedisMessageType> getTypeFromChannel(@NotNull String channel, @NotNull String clusterId) {
return Arrays.stream(values())
.filter(messageType -> messageType.getMessageChannel(clusterId).equalsIgnoreCase(channel))
.findFirst();
}
}

View File

@@ -0,0 +1,152 @@
/*
* This file is part of HuskSync, licensed under the Apache License 2.0.
*
* Copyright (c) William278 <will27528@gmail.com>
* Copyright (c) contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.william278.husksync.sync;
import net.william278.husksync.HuskSync;
import net.william278.husksync.api.HuskSyncAPI;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser;
import net.william278.husksync.util.Task;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Handles the synchronization of data when a player changes servers or logs in
*
* @since 3.1
*/
public abstract class DataSyncer {
private static final long BASE_LISTEN_ATTEMPTS = 16;
private static final long LISTEN_DELAY = 10;
protected final HuskSync plugin;
private final long maxListenAttempts;
@ApiStatus.Internal
protected DataSyncer(@NotNull HuskSync plugin) {
this.plugin = plugin;
this.maxListenAttempts = getMaxListenAttempts();
}
/**
* API-exposed constructor for a {@link DataSyncer}
*
* @param api instance of the {@link HuskSyncAPI}
*/
@SuppressWarnings("unused")
public DataSyncer(@NotNull HuskSyncAPI api) {
this(api.getPlugin());
}
/**
* Called when the plugin is enabled
*/
public void initialize() {
}
/**
* Called when the plugin is disabled
*/
public void terminate() {
}
/**
* Called when a user's data should be fetched and applied to them
*
* @param user the user to fetch data for
*/
public abstract void setUserData(@NotNull OnlineUser user);
/**
* Called when a user's data should be serialized and saved
*
* @param user the user to save
*/
public abstract void saveUserData(@NotNull OnlineUser user);
// Calculates the max attempts the system should listen for user data for based on the latency value
private long getMaxListenAttempts() {
return BASE_LISTEN_ATTEMPTS + (
(Math.max(100, plugin.getSettings().getNetworkLatencyMilliseconds()) / 1000) * 20 / LISTEN_DELAY
);
}
// Set a user's data from the database, or set them as a new user
@ApiStatus.Internal
protected void setUserFromDatabase(@NotNull OnlineUser user) {
plugin.getDatabase().getLatestSnapshot(user).ifPresentOrElse(
snapshot -> user.applySnapshot(snapshot, DataSnapshot.UpdateCause.SYNCHRONIZED),
() -> user.completeSync(true, DataSnapshot.UpdateCause.NEW_USER, plugin)
);
}
// Continuously listen for data from Redis
@ApiStatus.Internal
protected void listenForRedisData(@NotNull OnlineUser user, @NotNull Supplier<Boolean> completionSupplier) {
final AtomicLong timesRun = new AtomicLong(0L);
final AtomicReference<Task.Repeating> task = new AtomicReference<>();
final Runnable runnable = () -> {
if (user.isOffline()) {
task.get().cancel();
return;
}
if (plugin.isDisabling() || timesRun.getAndIncrement() > maxListenAttempts) {
task.get().cancel();
setUserFromDatabase(user);
return;
}
if (completionSupplier.get()) {
task.get().cancel();
}
};
task.set(plugin.getRepeatingTask(runnable, LISTEN_DELAY));
task.get().run();
}
/**
* Represents the different available default modes of {@link DataSyncer}
*
* @since 3.1
*/
public enum Mode {
DELAY(DelayDataSyncer::new),
LOCKSTEP(LockstepDataSyncer::new);
private final Function<HuskSync, ? extends DataSyncer> supplier;
Mode(@NotNull Function<HuskSync, ? extends DataSyncer> supplier) {
this.supplier = supplier;
}
@NotNull
public DataSyncer create(@NotNull HuskSync plugin) {
return supplier.apply(plugin);
}
}
}

View File

@@ -0,0 +1,69 @@
/*
* This file is part of HuskSync, licensed under the Apache License 2.0.
*
* Copyright (c) William278 <will27528@gmail.com>
* Copyright (c) contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.william278.husksync.sync;
import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull;
/**
* A data syncer which applies a network delay before checking the presence of user data
*/
public class DelayDataSyncer extends DataSyncer {
public DelayDataSyncer(@NotNull HuskSync plugin) {
super(plugin);
}
@Override
public void setUserData(@NotNull OnlineUser user) {
plugin.runAsyncDelayed(
() -> {
// Fetch from the database if the user isn't changing servers
if (!plugin.getRedisManager().getUserServerSwitch(user)) {
this.setUserFromDatabase(user);
return;
}
// Listen for the data to be updated
this.listenForRedisData(
user,
() -> plugin.getRedisManager().getUserData(user).map(data -> {
user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED);
return true;
}).orElse(false)
);
},
Math.max(0, plugin.getSettings().getNetworkLatencyMilliseconds() / 50L)
);
}
@Override
public void saveUserData(@NotNull OnlineUser user) {
plugin.runAsync(() -> {
plugin.getRedisManager().setUserServerSwitch(user);
final DataSnapshot.Packed data = user.createSnapshot(DataSnapshot.SaveCause.DISCONNECT);
plugin.getRedisManager().setUserData(user, data);
plugin.getDatabase().addSnapshot(user, data);
});
}
}

View File

@@ -0,0 +1,69 @@
/*
* This file is part of HuskSync, licensed under the Apache License 2.0.
*
* Copyright (c) William278 <will27528@gmail.com>
* Copyright (c) contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.william278.husksync.sync;
import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull;
public class LockstepDataSyncer extends DataSyncer {
public LockstepDataSyncer(@NotNull HuskSync plugin) {
super(plugin);
}
@Override
public void initialize() {
plugin.getRedisManager().clearUsersCheckedOutOnServer();
}
@Override
public void terminate() {
plugin.getRedisManager().clearUsersCheckedOutOnServer();
}
// Consume their data when they are checked in
@Override
public void setUserData(@NotNull OnlineUser user) {
this.listenForRedisData(user, () -> {
if (plugin.getRedisManager().getUserCheckedOut(user).isEmpty()) {
plugin.getRedisManager().setUserCheckedOut(user, true);
plugin.getRedisManager().getUserData(user).ifPresentOrElse(
data -> user.applySnapshot(data, DataSnapshot.UpdateCause.SYNCHRONIZED),
() -> this.setUserFromDatabase(user)
);
return true;
}
return false;
});
}
@Override
public void saveUserData(@NotNull OnlineUser user) {
plugin.runAsync(() -> {
final DataSnapshot.Packed data = user.createSnapshot(DataSnapshot.SaveCause.DISCONNECT);
plugin.getRedisManager().setUserData(user, data);
plugin.getRedisManager().setUserCheckedOut(user, false);
plugin.getDatabase().addSnapshot(user, data);
});
}
}

View File

@@ -158,7 +158,7 @@ public abstract class OnlineUser extends User implements CommandUser, UserDataHo
}
plugin.fireEvent(
plugin.getSyncCompleteEvent(this),
(event) -> plugin.getLockedPlayers().remove(getUuid())
(event) -> plugin.unlockPlayer(getUuid())
);
} else {
cause.getFailedLocale(plugin).ifPresent(this::sendMessage);

View File

@@ -70,6 +70,8 @@ public class DataSnapshotOverview {
}
locales.getLocale("data_manager_cause", snapshot.getSaveCause().getDisplayName())
.ifPresent(user::sendMessage);
locales.getLocale("data_manager_server", snapshot.getServerName())
.ifPresent(user::sendMessage);
// User status data, if present in the snapshot
final Optional<Data.Health> health = snapshot.getHealth();