9
0
mirror of https://github.com/WiIIiam278/HuskSync.git synced 2025-12-19 14:59:21 +00:00

refactor: start lettuce refactor

This commit is contained in:
William278
2025-03-08 13:16:26 +00:00
parent 02c8b899dc
commit 275972a094
12 changed files with 219 additions and 94 deletions

View File

@@ -19,7 +19,7 @@ ext {
set 'version', version.toString()
set 'description', description.toString()
set 'jedis_version', jedis_version.toString()
set 'lettuce_version', lettuce_version.toString()
set 'mysql_driver_version', mysql_driver_version.toString()
set 'mariadb_driver_version', mariadb_driver_version.toString()
set 'postgres_driver_version', postgres_driver_version.toString()

View File

@@ -31,7 +31,6 @@ dependencies {
compileOnly 'com.zaxxer:HikariCP:6.2.1'
compileOnly 'net.william278:DesertWell:2.0.4'
compileOnly 'net.william278:AdvancementAPI:97a9583413'
compileOnly "redis.clients:jedis:$jedis_version"
annotationProcessor 'org.projectlombok:lombok:1.18.36'
}

View File

@@ -1,6 +1,6 @@
# Dependencies for HuskSync on Paper
libraries:
- 'redis.clients:jedis:${jedis_version}'
- 'io.lettuce:lettuce-core:${lettuce_version}'
- 'com.mysql:mysql-connector-j:${mysql_driver_version}'
- 'org.mariadb.jdbc:mariadb-java-client:${mariadb_driver_version}'
- 'org.postgresql:postgresql:${postgres_driver_version}'

View File

@@ -12,7 +12,7 @@ softdepend:
- 'MysqlPlayerDataBridge'
- 'Plan'
libraries:
- 'redis.clients:jedis:${jedis_version}'
- 'io.lettuce:lettuce-core:${lettuce_version}'
- 'com.mysql:mysql-connector-j:${mysql_driver_version}'
- 'org.mariadb.jdbc:mariadb-java-client:${mariadb_driver_version}'
- 'org.postgresql:postgresql:${postgres_driver_version}'

View File

@@ -18,6 +18,7 @@ dependencies {
}
compileOnlyApi 'net.william278.toilet:toilet-common:1.0.12'
compileOnlyApi "io.lettuce:lettuce-core:${lettuce_version}"
compileOnly 'net.william278.uniform:uniform-common:1.3.1'
compileOnly 'com.mojang:brigadier:1.1.8'
@@ -28,14 +29,13 @@ dependencies {
compileOnly "net.kyori:adventure-text-serializer-plain:4.19.0"
compileOnly 'com.google.guava:guava:33.4.0-jre'
compileOnly 'com.github.plan-player-analytics:Plan:5.5.2272'
compileOnly "redis.clients:jedis:$jedis_version"
compileOnly "com.mysql:mysql-connector-j:$mysql_driver_version"
compileOnly "org.mariadb.jdbc:mariadb-java-client:$mariadb_driver_version"
compileOnly "org.postgresql:postgresql:$postgres_driver_version"
compileOnly "org.mongodb:mongodb-driver-sync:$mongodb_driver_version"
compileOnly "org.xerial.snappy:snappy-java:$snappy_version"
testImplementation "redis.clients:jedis:$jedis_version"
testImplementation "redis.clients:jedis:$lettuce_version"
testImplementation "org.xerial.snappy:snappy-java:$snappy_version"
testImplementation 'com.google.guava:guava:33.4.0-jre'
testImplementation 'com.github.plan-player-analytics:Plan:5.5.2272'

View File

@@ -0,0 +1,77 @@
package net.william278.husksync.redis;
import net.william278.husksync.HuskSync;
import net.william278.husksync.data.DataSnapshot;
import net.william278.husksync.user.OnlineUser;
import org.jetbrains.annotations.NotNull;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
public interface MessageHandler {
// Inbound message telling HuskSync on this server to update a user's data (if the users, by UUID, is online)
default void handleUpdateUserData(@NotNull RedisMessage message) {
final Optional<OnlineUser> target = getPlugin().getOnlineUser(message.getTargetUuid());
if (target.isEmpty()) {
return;
}
// Handle the request, update the data locally
final OnlineUser user = target.get();
getPlugin().lockPlayer(user.getUuid());
try {
final DataSnapshot.Packed data = DataSnapshot.deserialize(getPlugin(), message.getPayload());
user.applySnapshot(data, DataSnapshot.UpdateCause.UPDATED);
return;
} catch (Throwable e) {
getPlugin().log(Level.SEVERE, "An exception occurred updating user data from Redis", e);
}
user.completeSync(false, DataSnapshot.UpdateCause.UPDATED, getPlugin());
}
// Inbound message telling HuskSync on this server to reply with a user's data (if the user, by UUID, is online)
default void handleRequestUserData(@NotNull RedisMessage message) {
final Optional<OnlineUser> target = getPlugin().getOnlineUser(message.getTargetUuid());
if (target.isEmpty()) {
return;
}
// Handle the request, return the data
final OnlineUser user = target.get();
final RedisMessage reply = RedisMessage.create(
UUID.fromString(new String(message.getPayload(), StandardCharsets.UTF_8)),
user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(getPlugin())
);
reply.dispatch(getPlugin(), RedisMessage.Type.RETURN_USER_DATA);
}
// Inbound message containing returned user data from a REQUEST_USER_DATA message. If the server had made a request
// then it will complete the future in the pendingRequests map.
default void handleReturnUserData(@NotNull RedisMessage message) {
final UUID requestId = message.getTargetUuid();
final CompletableFuture<Optional<DataSnapshot.Packed>> future = getPendingRequests().get(requestId);
if (future == null) {
return;
}
try {
final DataSnapshot.Packed data = DataSnapshot.deserialize(getPlugin(), message.getPayload());
future.complete(Optional.of(data));
} catch (Throwable e) {
getPlugin().log(Level.SEVERE, "An exception occurred returning user data from Redis", e);
future.complete(Optional.empty());
}
getPendingRequests().remove(requestId);
}
@NotNull
Map<UUID, CompletableFuture<Optional<DataSnapshot.Packed>>> getPendingRequests();
@NotNull
HuskSync getPlugin();
}

View File

@@ -0,0 +1,39 @@
package net.william278.husksync.redis;
import io.lettuce.core.pubsub.RedisPubSubListener;
import net.william278.husksync.HuskSync;
import org.jetbrains.annotations.NotNull;
import java.util.logging.Level;
public interface PubSubListener extends RedisPubSubListener<String, String> {
@Override
default void message(String pattern, String channel, String message) {
getPlugin().log(Level.WARNING, "[Redis] Got message on pattern channel '%s'".formatted(channel));
}
@Override
default void subscribed(String channel, long count) {
getPlugin().log(Level.INFO, "[Redis] Subscribed to channel '%s'".formatted(channel));
}
@Override
default void unsubscribed(String channel, long count) {
getPlugin().log(Level.INFO, "[Redis] Unsubscribed from channel '%s'".formatted(channel));
}
@Override
default void psubscribed(String pattern, long count) {
getPlugin().log(Level.INFO, "[Redis] Subscribed to pattern '%s'".formatted(pattern));
}
@Override
default void punsubscribed(String pattern, long count) {
getPlugin().log(Level.INFO, "[Redis] Unsubscribed from pattern '%s'".formatted(pattern));
}
@NotNull
HuskSync getPlugin();
}

View File

@@ -19,6 +19,10 @@
package net.william278.husksync.redis;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.support.AsyncPool;
import lombok.Getter;
import net.william278.husksync.HuskSync;
import net.william278.husksync.config.Settings;
import net.william278.husksync.data.DataSnapshot;
@@ -26,9 +30,6 @@ import net.william278.husksync.user.User;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.Pool;
import java.nio.charset.StandardCharsets;
import java.util.*;
@@ -40,18 +41,22 @@ import java.util.logging.Level;
/**
* Manages the connection to Redis, handling the caching of user data
*/
public class RedisManager extends JedisPubSub {
public class RedisManager implements MessageHandler, PubSubListener {
protected static final String KEY_NAMESPACE = "husksync:";
private static final int RECONNECTION_TIME = 8000;
@Getter
private final HuskSync plugin;
private final String clusterId;
private Pool<Jedis> jedisPool;
@Getter
private final Map<UUID, CompletableFuture<Optional<DataSnapshot.Packed>>> pendingRequests;
private final String clusterId;
private AsyncPool<StatefulRedisConnection<String, String>> connectionPool;
private boolean enabled;
private boolean reconnected;
private boolean closed = false;
public RedisManager(@NotNull HuskSync plugin) {
this.plugin = plugin;
@@ -71,6 +76,7 @@ public class RedisManager extends JedisPubSub {
final boolean useSSL = credentials.isUseSsl();
// Create the jedis pool
final RedisClient client = RedisClient.create();
final JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(0);
config.setTestOnBorrow(true);
@@ -79,18 +85,18 @@ public class RedisManager extends JedisPubSub {
final Settings.RedisSettings.RedisSentinel sentinel = plugin.getSettings().getRedis().getSentinel();
Set<String> redisSentinelNodes = new HashSet<>(sentinel.getNodes());
if (redisSentinelNodes.isEmpty()) {
this.jedisPool = password.isEmpty()
this.connectionPool = password.isEmpty()
? new JedisPool(config, host, port, 0, useSSL)
: new JedisPool(config, host, port, 0, password, useSSL);
} else {
final String sentinelPassword = sentinel.getPassword();
this.jedisPool = new JedisSentinelPool(sentinel.getMaster(), redisSentinelNodes, password.isEmpty()
this.connectionPool = new JedisSentinelPool(sentinel.getMaster(), redisSentinelNodes, password.isEmpty()
? null : password, sentinelPassword.isEmpty() ? null : sentinelPassword);
}
// Ping the server to check the connection
try {
jedisPool.getResource().ping();
connectionPool.getResource().ping();
} catch (JedisException e) {
throw new IllegalStateException("Failed to establish connection with Redis. "
+ "Please check the supplied credentials in the config file", e);
@@ -103,8 +109,8 @@ public class RedisManager extends JedisPubSub {
@Blocking
private void subscribe() {
while (enabled && !Thread.interrupted() && jedisPool != null && !jedisPool.isClosed()) {
try (Jedis jedis = jedisPool.getResource()) {
while (enabled && !Thread.interrupted() && connectionPool != null && !connectionPool.isClosed()) {
try (Jedis jedis = connectionPool.getResource()) {
if (reconnected) {
plugin.log(Level.INFO, "Redis connection is alive again");
}
@@ -150,65 +156,27 @@ public class RedisManager extends JedisPubSub {
}
@Override
public void onMessage(@NotNull String channel, @NotNull String message) {
final RedisMessage.Type messageType = RedisMessage.Type.getTypeFromChannel(channel, clusterId).orElse(null);
if (messageType == null) {
public void message(@NotNull String channel, @NotNull String body) {
final RedisMessage.Type type = RedisMessage.Type.getType(channel, clusterId);
if (type == null) {
return;
}
final RedisMessage redisMessage = RedisMessage.fromJson(plugin, message);
switch (messageType) {
case UPDATE_USER_DATA -> plugin.getOnlineUser(redisMessage.getTargetUuid()).ifPresent(
user -> {
plugin.lockPlayer(user.getUuid());
try {
final DataSnapshot.Packed data = DataSnapshot.deserialize(plugin, redisMessage.getPayload());
user.applySnapshot(data, DataSnapshot.UpdateCause.UPDATED);
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred updating user data from Redis", e);
user.completeSync(false, DataSnapshot.UpdateCause.UPDATED, plugin);
final RedisMessage message = RedisMessage.fromJson(plugin, body);
switch (type) {
case UPDATE_USER_DATA -> handleUpdateUserData(message);
case REQUEST_USER_DATA -> handleRequestUserData(message);
case RETURN_USER_DATA -> handleReturnUserData(message);
default -> plugin.log(Level.SEVERE, "Received unknown message type: " + type);
}
}
);
case REQUEST_USER_DATA -> plugin.getOnlineUser(redisMessage.getTargetUuid()).ifPresent(
user -> RedisMessage.create(
UUID.fromString(new String(redisMessage.getPayload(), StandardCharsets.UTF_8)),
user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(plugin)
).dispatch(plugin, RedisMessage.Type.RETURN_USER_DATA)
);
case RETURN_USER_DATA -> {
final CompletableFuture<Optional<DataSnapshot.Packed>> future = pendingRequests.get(
redisMessage.getTargetUuid()
);
if (future != null) {
try {
final DataSnapshot.Packed data = DataSnapshot.deserialize(plugin, redisMessage.getPayload());
future.complete(Optional.of(data));
} catch (Throwable e) {
plugin.log(Level.SEVERE, "An exception occurred returning user data from Redis", e);
future.complete(Optional.empty());
}
pendingRequests.remove(redisMessage.getTargetUuid());
}
}
}
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
plugin.log(Level.INFO, "Redis subscribed to channel '" + channel + "'");
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
plugin.log(Level.INFO, "Redis unsubscribed from channel '" + channel + "'");
}
@Blocking
protected void sendMessage(@NotNull String channel, @NotNull String message) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.publish(channel, message);
if (closed) {
return;
}
connection.async().publish(channel, message);
}
public void sendUserDataUpdate(@NotNull User user, @NotNull DataSnapshot.Packed data) {
@@ -252,7 +220,7 @@ public class RedisManager extends JedisPubSub {
// Set a user's data to Redis
@Blocking
public void setUserData(@NotNull User user, @NotNull DataSnapshot.Packed data) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId),
RedisKeyType.TTL_1_YEAR,
@@ -266,7 +234,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public void clearUserData(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
jedis.del(
getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId)
);
@@ -278,7 +246,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public void setUserCheckedOut(@NotNull User user, boolean checkedOut) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final String key = getKeyString(RedisKeyType.DATA_CHECKOUT, user.getUuid(), clusterId);
if (checkedOut) {
jedis.set(
@@ -301,7 +269,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public Optional<String> getUserCheckedOut(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final byte[] key = getKey(RedisKeyType.DATA_CHECKOUT, user.getUuid(), clusterId);
final byte[] readData = jedis.get(key);
if (readData != null) {
@@ -321,7 +289,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public void clearUsersCheckedOutOnServer() {
final String keyFormat = String.format("%s*", RedisKeyType.DATA_CHECKOUT.getKeyPrefix(clusterId));
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final Set<String> keys = jedis.keys(keyFormat);
if (keys == null) {
plugin.log(Level.WARNING, "Checkout key returned null from Redis during clearing");
@@ -344,7 +312,7 @@ public class RedisManager extends JedisPubSub {
*/
@Blocking
public void setUserServerSwitch(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
jedis.setex(
getKey(RedisKeyType.SERVER_SWITCH, user.getUuid(), clusterId),
RedisKeyType.TTL_10_SECONDS,
@@ -365,7 +333,7 @@ public class RedisManager extends JedisPubSub {
*/
@Blocking
public Optional<DataSnapshot.Packed> getUserData(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final byte[] key = getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId);
final byte[] dataByteArray = jedis.get(key);
if (dataByteArray == null) {
@@ -389,7 +357,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public boolean getUserServerSwitch(@NotNull User user) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final byte[] key = getKey(RedisKeyType.SERVER_SWITCH, user.getUuid(), clusterId);
final byte[] readData = jedis.get(key);
if (readData == null) {
@@ -411,7 +379,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public String getStatusDump() {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
return jedis.info();
}
}
@@ -419,7 +387,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public long getLatency() {
final long startTime = System.currentTimeMillis();
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
jedis.ping();
return startTime - System.currentTimeMillis();
}
@@ -438,7 +406,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public void bindMapIds(@NotNull String fromServer, int fromId, @NotNull String toServer, int toId) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
jedis.setex(
getMapIdKey(fromServer, fromId, toServer, clusterId),
RedisKeyType.TTL_1_YEAR,
@@ -457,7 +425,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public Optional<Integer> getBoundMapId(@NotNull String fromServer, int fromId, @NotNull String toServer) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final byte[] readData = jedis.get(getMapIdKey(fromServer, fromId, toServer, clusterId));
if (readData == null) {
plugin.debug(String.format("[%s:%s] No bound map id for server %s Redis",
@@ -476,7 +444,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public @Nullable Map.Entry<String, Integer> getReversedMapBound(@NotNull String toServer, int toId) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final byte[] readData = jedis.get(getReversedMapIdKey(toServer, toId, clusterId));
if (readData == null) {
plugin.debug(String.format("[%s:%s] No reversed map bound on Redis",
@@ -496,7 +464,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public void setMapData(@NotNull String serverName, int mapId, byte[] data) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
jedis.setex(
getMapDataKey(serverName, mapId, clusterId),
RedisKeyType.TTL_1_YEAR,
@@ -510,7 +478,7 @@ public class RedisManager extends JedisPubSub {
@Blocking
public byte @Nullable [] getMapData(@NotNull String serverName, int mapId) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = connectionPool.getResource()) {
final byte[] readData = jedis.get(getMapDataKey(serverName, mapId, clusterId));
if (readData == null) {
plugin.debug(String.format("[%s:%s] No map data on Redis",
@@ -530,9 +498,9 @@ public class RedisManager extends JedisPubSub {
@Blocking
public void terminate() {
enabled = false;
if (jedisPool != null) {
if (!jedisPool.isClosed()) {
jedisPool.close();
if (connectionPool != null) {
if (!connectionPool.isClosed()) {
connectionPool.close();
}
}
this.unsubscribe();

View File

@@ -26,11 +26,9 @@ import lombok.Setter;
import net.william278.husksync.HuskSync;
import net.william278.husksync.adapter.Adaptable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.*;
public class RedisMessage implements Adaptable {
@@ -82,7 +80,6 @@ public class RedisMessage implements Adaptable {
REQUEST_USER_DATA,
RETURN_USER_DATA;
@NotNull
public String getMessageChannel(@NotNull String clusterId) {
return String.format(
"%s:%s:%s",
@@ -92,10 +89,10 @@ public class RedisMessage implements Adaptable {
);
}
public static Optional<Type> getTypeFromChannel(@NotNull String channel, @NotNull String clusterId) {
public static @Nullable Type getType(@NotNull String channel, @NotNull String clusterId) {
return Arrays.stream(values())
.filter(messageType -> messageType.getMessageChannel(clusterId).equalsIgnoreCase(channel))
.findFirst();
.findAny().orElse(null);
}
}

View File

@@ -0,0 +1,45 @@
package net.william278.husksync.redis;
import io.lettuce.core.codec.RedisCodec;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
public class StringByteArrayCodec implements RedisCodec<String, byte[]> {
public static final StringByteArrayCodec INSTANCE = new StringByteArrayCodec();
private static final byte[] EMPTY = new byte[0];
private final Charset charset = StandardCharsets.UTF_8;
@Override
public String decodeKey(final ByteBuffer bytes) {
return charset.decode(bytes).toString();
}
@Override
public byte[] decodeValue(final ByteBuffer bytes) {
return getBytes(bytes);
}
@Override
public ByteBuffer encodeKey(final String key) {
return charset.encode(key);
}
@Override
public ByteBuffer encodeValue(final byte[] value) {
if (value == null) {
return ByteBuffer.wrap(EMPTY);
}
return ByteBuffer.wrap(value);
}
private static byte[] getBytes(final ByteBuffer buffer) {
final byte[] b = new byte[buffer.remaining()];
buffer.get(b);
return b;
}
}

View File

@@ -22,7 +22,7 @@ dependencies {
implementation include("com.mysql:mysql-connector-j:$mysql_driver_version")
implementation include("org.postgresql:postgresql:$postgres_driver_version")
implementation include("org.mariadb.jdbc:mariadb-java-client:$mariadb_driver_version")
implementation include("redis.clients:jedis:$jedis_version")
implementation include("io.lettuce:lettuce-core:$lettuce_version")
implementation include("org.xerial.snappy:snappy-java:$snappy_version")
compileOnly 'net.william278:DesertWell:2.0.4'

View File

@@ -9,7 +9,7 @@ plugin_archive=husksync
plugin_description=A modern, cross-server player data synchronization system
# General settings
jedis_version=5.2.0
lettuce_version=6.5.3.RELEASE
mysql_driver_version=9.2.0
mariadb_driver_version=3.5.1
postgres_driver_version=42.7.5