mirror of
https://github.com/WiIIiam278/HuskSync.git
synced 2025-12-19 14:59:21 +00:00
feat: add more Redis configuration options (#564)
This commit is contained in:
@@ -174,7 +174,45 @@ public class Settings {
|
||||
private int database = 0;
|
||||
private String user = "";
|
||||
private String password = "";
|
||||
|
||||
@Comment("Use SSL/TLS for encrypted connections.")
|
||||
private boolean useSsl = false;
|
||||
|
||||
@Comment("Connection timeout in milliseconds.")
|
||||
private int connectionTimeout = 2000;
|
||||
|
||||
@Comment("Socket (read/write) timeout in milliseconds.")
|
||||
private int socketTimeout = 2000;
|
||||
|
||||
@Comment("Max number of connections in the pool.")
|
||||
private int maxTotalConnections = 50;
|
||||
|
||||
@Comment("Max number of idle connections in the pool.")
|
||||
private int maxIdleConnections = 8;
|
||||
|
||||
@Comment("Min number of idle connections in the pool.")
|
||||
private int minIdleConnections = 2;
|
||||
|
||||
@Comment("Enable health checks when borrowing connections from the pool.")
|
||||
private boolean testOnBorrow = true;
|
||||
|
||||
@Comment("Enable health checks when returning connections to the pool.")
|
||||
private boolean testOnReturn = true;
|
||||
|
||||
@Comment("Enable periodic idle connection health checks.")
|
||||
private boolean testWhileIdle = true;
|
||||
|
||||
@Comment("Min evictable idle time (ms) before a connection is eligible for eviction.")
|
||||
private long minEvictableIdleTimeMillis = 60000;
|
||||
|
||||
@Comment("Time (ms) between eviction runs.")
|
||||
private long timeBetweenEvictionRunsMillis = 30000;
|
||||
|
||||
@Comment("Number of retries for commands when connection fails.")
|
||||
private int maxRetries = 3;
|
||||
|
||||
@Comment("Base backoff time in ms for retries (exponential backoff multiplier).")
|
||||
private int retryBackoffMillis = 200;
|
||||
}
|
||||
|
||||
@Comment("Options for if you're using Redis sentinel. Don't modify this unless you know what you're doing!")
|
||||
|
||||
@@ -62,9 +62,11 @@ public class RedisManager extends JedisPubSub {
|
||||
/**
|
||||
* Initialize Redis connection pool
|
||||
*/
|
||||
|
||||
@Blocking
|
||||
public void initialize() throws IllegalStateException {
|
||||
final Settings.RedisSettings.RedisCredentials credentials = plugin.getSettings().getRedis().getCredentials();
|
||||
|
||||
final String user = credentials.getUser();
|
||||
final String password = credentials.getPassword();
|
||||
final String host = credentials.getHost();
|
||||
@@ -72,44 +74,50 @@ public class RedisManager extends JedisPubSub {
|
||||
final int database = credentials.getDatabase();
|
||||
final boolean useSSL = credentials.isUseSsl();
|
||||
|
||||
// Create the jedis pool
|
||||
// Configure JedisPoolConfig
|
||||
final JedisPoolConfig config = new JedisPoolConfig();
|
||||
config.setMaxIdle(0);
|
||||
config.setTestOnBorrow(true);
|
||||
config.setTestOnReturn(true);
|
||||
config.setMaxTotal(credentials.getMaxTotalConnections());
|
||||
config.setMaxIdle(credentials.getMaxIdleConnections());
|
||||
config.setMinIdle(credentials.getMinIdleConnections());
|
||||
config.setTestOnBorrow(credentials.isTestOnBorrow());
|
||||
config.setTestOnReturn(credentials.isTestOnReturn());
|
||||
config.setTestWhileIdle(credentials.isTestWhileIdle());
|
||||
config.setMinEvictableIdleTimeMillis(credentials.getMinEvictableIdleTimeMillis());
|
||||
config.setTimeBetweenEvictionRunsMillis(credentials.getTimeBetweenEvictionRunsMillis());
|
||||
|
||||
final Settings.RedisSettings.RedisSentinel sentinel = plugin.getSettings().getRedis().getSentinel();
|
||||
Set<String> redisSentinelNodes = new HashSet<>(sentinel.getNodes());
|
||||
|
||||
if (redisSentinelNodes.isEmpty()) {
|
||||
// Standalone Redis setup
|
||||
DefaultJedisClientConfig.Builder clientConfigBuilder = DefaultJedisClientConfig.builder()
|
||||
.ssl(useSSL)
|
||||
.database(database)
|
||||
.timeoutMillis(0);
|
||||
|
||||
if (!user.isEmpty()) {
|
||||
clientConfigBuilder.user(user);
|
||||
}
|
||||
|
||||
if (!password.isEmpty()) {
|
||||
clientConfigBuilder.password(password);
|
||||
}
|
||||
.timeoutMillis(credentials.getConnectionTimeout()) // connection and socket timeout combined
|
||||
.user(user.isEmpty() ? null : user)
|
||||
.password(password.isEmpty() ? null : password);
|
||||
|
||||
this.jedisPool = new JedisPool(config, new HostAndPort(host, port), clientConfigBuilder.build());
|
||||
} else {
|
||||
final String sentinelPassword = sentinel.getPassword();
|
||||
this.jedisPool = new JedisSentinelPool(sentinel.getMaster(), redisSentinelNodes, password.isEmpty()
|
||||
? null : password, sentinelPassword.isEmpty() ? null : sentinelPassword);
|
||||
this.jedisPool = new JedisSentinelPool(
|
||||
sentinel.getMaster(),
|
||||
redisSentinelNodes,
|
||||
config,
|
||||
credentials.getConnectionTimeout(),
|
||||
credentials.getSocketTimeout(),
|
||||
password.isEmpty() ? null : password,
|
||||
sentinelPassword.isEmpty() ? null : sentinelPassword,
|
||||
database);
|
||||
}
|
||||
|
||||
// Ping the server to check the connection
|
||||
try {
|
||||
jedisPool.getResource().ping();
|
||||
try (var jedis = jedisPool.getResource()) {
|
||||
jedis.ping();
|
||||
} catch (JedisException e) {
|
||||
throw new IllegalStateException("Failed to establish connection with Redis. "
|
||||
+ "Please check the supplied credentials in the config file", e);
|
||||
throw new IllegalStateException("Failed to establish connection with Redis. " +
|
||||
"Please check the supplied credentials in the config file", e);
|
||||
}
|
||||
|
||||
// Subscribe using a thread (rather than a task)
|
||||
enabled = true;
|
||||
new Thread(this::subscribe, "husksync:redis_subscriber").start();
|
||||
}
|
||||
@@ -126,8 +134,7 @@ public class RedisManager extends JedisPubSub {
|
||||
this,
|
||||
Arrays.stream(RedisMessage.Type.values())
|
||||
.map(type -> type.getMessageChannel(clusterId))
|
||||
.toArray(String[]::new)
|
||||
);
|
||||
.toArray(String[]::new));
|
||||
} catch (Throwable t) {
|
||||
// Thread was unlocked due error
|
||||
onThreadUnlock(t);
|
||||
@@ -175,20 +182,19 @@ public class RedisManager extends JedisPubSub {
|
||||
user -> {
|
||||
plugin.lockPlayer(user.getUuid());
|
||||
try {
|
||||
final DataSnapshot.Packed data = DataSnapshot.deserialize(plugin, redisMessage.getPayload());
|
||||
final DataSnapshot.Packed data = DataSnapshot.deserialize(plugin,
|
||||
redisMessage.getPayload());
|
||||
user.applySnapshot(data, DataSnapshot.UpdateCause.UPDATED);
|
||||
} catch (Throwable e) {
|
||||
plugin.log(Level.SEVERE, "An exception occurred updating user data from Redis", e);
|
||||
user.completeSync(false, DataSnapshot.UpdateCause.UPDATED, plugin);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
case REQUEST_USER_DATA -> redisMessage.getTargetUser(plugin).ifPresent(
|
||||
user -> RedisMessage.create(
|
||||
UUID.fromString(new String(redisMessage.getPayload(), StandardCharsets.UTF_8)),
|
||||
user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(plugin)
|
||||
).dispatch(plugin, RedisMessage.Type.RETURN_USER_DATA)
|
||||
);
|
||||
user.createSnapshot(DataSnapshot.SaveCause.INVENTORY_COMMAND).asBytes(plugin))
|
||||
.dispatch(plugin, RedisMessage.Type.RETURN_USER_DATA));
|
||||
case CHECK_IN_PETITION -> {
|
||||
if (!redisMessage.isTargetServer(plugin)
|
||||
|| !plugin.getSettings().getSynchronization().isCheckinPetitions()) {
|
||||
@@ -199,7 +205,8 @@ public class RedisManager extends JedisPubSub {
|
||||
boolean online = plugin.getDisconnectingPlayers().contains(user.getUuid())
|
||||
|| plugin.getOnlineUser(user.getUuid()).isEmpty();
|
||||
if (!online && !plugin.isLocked(user.getUuid())) {
|
||||
plugin.debug("[%s] Received check-in petition for online/unlocked user, ignoring".formatted(user.getName()));
|
||||
plugin.debug("[%s] Received check-in petition for online/unlocked user, ignoring"
|
||||
.formatted(user.getName()));
|
||||
return;
|
||||
}
|
||||
plugin.getRedisManager().setUserCheckedOut(user, false);
|
||||
@@ -252,31 +259,30 @@ public class RedisManager extends JedisPubSub {
|
||||
redisMessage.dispatch(plugin, RedisMessage.Type.CHECK_IN_PETITION);
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<DataSnapshot.Packed>> getOnlineUserData(@NotNull UUID requestId, @NotNull User user,
|
||||
public CompletableFuture<Optional<DataSnapshot.Packed>> getOnlineUserData(@NotNull UUID requestId,
|
||||
@NotNull User user,
|
||||
@NotNull DataSnapshot.SaveCause saveCause) {
|
||||
return plugin.getOnlineUser(user.getUuid())
|
||||
.map(online -> CompletableFuture.completedFuture(
|
||||
Optional.of(online.createSnapshot(saveCause)))
|
||||
)
|
||||
Optional.of(online.createSnapshot(saveCause))))
|
||||
.orElse(this.getNetworkedUserData(requestId, user));
|
||||
}
|
||||
|
||||
// Request a user's dat x-server
|
||||
private CompletableFuture<Optional<DataSnapshot.Packed>> getNetworkedUserData(@NotNull UUID requestId, @NotNull User user) {
|
||||
private CompletableFuture<Optional<DataSnapshot.Packed>> getNetworkedUserData(@NotNull UUID requestId,
|
||||
@NotNull User user) {
|
||||
final CompletableFuture<Optional<DataSnapshot.Packed>> future = new CompletableFuture<>();
|
||||
pendingRequests.put(requestId, future);
|
||||
plugin.runAsync(() -> {
|
||||
final RedisMessage redisMessage = RedisMessage.create(
|
||||
user.getUuid(),
|
||||
requestId.toString().getBytes(StandardCharsets.UTF_8)
|
||||
);
|
||||
requestId.toString().getBytes(StandardCharsets.UTF_8));
|
||||
redisMessage.dispatch(plugin, RedisMessage.Type.REQUEST_USER_DATA);
|
||||
});
|
||||
return future
|
||||
.orTimeout(
|
||||
plugin.getSettings().getSynchronization().getNetworkLatencyMilliseconds(),
|
||||
TimeUnit.MILLISECONDS
|
||||
)
|
||||
TimeUnit.MILLISECONDS)
|
||||
.exceptionally(throwable -> {
|
||||
pendingRequests.remove(requestId);
|
||||
return Optional.empty();
|
||||
@@ -290,8 +296,7 @@ public class RedisManager extends JedisPubSub {
|
||||
jedis.setex(
|
||||
getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId),
|
||||
RedisKeyType.TTL_1_YEAR,
|
||||
data.asBytes(plugin)
|
||||
);
|
||||
data.asBytes(plugin));
|
||||
plugin.debug(String.format("[%s] Set %s key on Redis", user.getName(), RedisKeyType.LATEST_SNAPSHOT));
|
||||
} catch (Throwable e) {
|
||||
plugin.log(Level.SEVERE, "An exception occurred setting user data on Redis", e);
|
||||
@@ -302,8 +307,7 @@ public class RedisManager extends JedisPubSub {
|
||||
public void clearUserData(@NotNull User user) {
|
||||
try (Jedis jedis = jedisPool.getResource()) {
|
||||
jedis.del(
|
||||
getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId)
|
||||
);
|
||||
getKey(RedisKeyType.LATEST_SNAPSHOT, user.getUuid(), clusterId));
|
||||
plugin.debug(String.format("[%s] Cleared %s on Redis", user.getName(), RedisKeyType.LATEST_SNAPSHOT));
|
||||
} catch (Throwable e) {
|
||||
plugin.log(Level.SEVERE, "An exception occurred clearing user data on Redis", e);
|
||||
@@ -317,8 +321,7 @@ public class RedisManager extends JedisPubSub {
|
||||
if (checkedOut) {
|
||||
jedis.set(
|
||||
key.getBytes(StandardCharsets.UTF_8),
|
||||
plugin.getServerName().getBytes(StandardCharsets.UTF_8)
|
||||
);
|
||||
plugin.getServerName().getBytes(StandardCharsets.UTF_8));
|
||||
} else {
|
||||
if (jedis.del(key.getBytes(StandardCharsets.UTF_8)) == 0) {
|
||||
plugin.debug(String.format("[%s] %s key not set on Redis when attempting removal (%s)",
|
||||
@@ -382,8 +385,7 @@ public class RedisManager extends JedisPubSub {
|
||||
jedis.setex(
|
||||
getKey(RedisKeyType.SERVER_SWITCH, user.getUuid(), clusterId),
|
||||
RedisKeyType.TTL_10_SECONDS,
|
||||
new byte[0]
|
||||
);
|
||||
new byte[0]);
|
||||
plugin.debug(String.format("[%s] Set %s key to Redis",
|
||||
user.getName(), RedisKeyType.SERVER_SWITCH));
|
||||
} catch (Throwable e) {
|
||||
@@ -395,7 +397,8 @@ public class RedisManager extends JedisPubSub {
|
||||
* Fetch a user's data from Redis and consume the key if found
|
||||
*
|
||||
* @param user The user to fetch data for
|
||||
* @return The user's data, if it's present on the database. Otherwise, an empty optional.
|
||||
* @return The user's data, if it's present on the database. Otherwise, an empty
|
||||
* optional.
|
||||
*/
|
||||
@Blocking
|
||||
public Optional<DataSnapshot.Packed> getUserData(@NotNull User user) {
|
||||
@@ -476,13 +479,11 @@ public class RedisManager extends JedisPubSub {
|
||||
jedis.setex(
|
||||
getMapIdKey(fromServer, fromId, toServer, clusterId),
|
||||
RedisKeyType.TTL_1_YEAR,
|
||||
String.valueOf(toId).getBytes(StandardCharsets.UTF_8)
|
||||
);
|
||||
String.valueOf(toId).getBytes(StandardCharsets.UTF_8));
|
||||
jedis.setex(
|
||||
getReversedMapIdKey(toServer, toId, clusterId),
|
||||
RedisKeyType.TTL_1_YEAR,
|
||||
String.format("%s:%s", fromServer, fromId).getBytes(StandardCharsets.UTF_8)
|
||||
);
|
||||
String.format("%s:%s", fromServer, fromId).getBytes(StandardCharsets.UTF_8));
|
||||
plugin.debug(String.format("Bound map %s:%s -> %s:%s on Redis", fromServer, fromId, toServer, toId));
|
||||
} catch (Throwable e) {
|
||||
plugin.log(Level.SEVERE, "An exception occurred binding map ids on Redis", e);
|
||||
@@ -534,8 +535,7 @@ public class RedisManager extends JedisPubSub {
|
||||
jedis.setex(
|
||||
getMapDataKey(serverName, mapId, clusterId),
|
||||
RedisKeyType.TTL_1_YEAR,
|
||||
data
|
||||
);
|
||||
data);
|
||||
plugin.debug(String.format("Set map data %s:%s on Redis", serverName, mapId));
|
||||
} catch (Throwable e) {
|
||||
plugin.log(Level.SEVERE, "An exception occurred setting map data on Redis", e);
|
||||
@@ -581,16 +581,20 @@ public class RedisManager extends JedisPubSub {
|
||||
return String.format("%s:%s", keyType.getKeyPrefix(clusterId), uuid);
|
||||
}
|
||||
|
||||
private static byte[] getMapIdKey(@NotNull String fromServer, int fromId, @NotNull String toServer, @NotNull String clusterId) {
|
||||
return String.format("%s:%s:%s:%s", RedisKeyType.MAP_ID.getKeyPrefix(clusterId), fromServer, fromId, toServer).getBytes(StandardCharsets.UTF_8);
|
||||
private static byte[] getMapIdKey(@NotNull String fromServer, int fromId, @NotNull String toServer,
|
||||
@NotNull String clusterId) {
|
||||
return String.format("%s:%s:%s:%s", RedisKeyType.MAP_ID.getKeyPrefix(clusterId), fromServer, fromId, toServer)
|
||||
.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] getReversedMapIdKey(@NotNull String toServer, int toId, @NotNull String clusterId) {
|
||||
return String.format("%s:%s:%s", RedisKeyType.MAP_ID_REVERSED.getKeyPrefix(clusterId), toServer, toId).getBytes(StandardCharsets.UTF_8);
|
||||
return String.format("%s:%s:%s", RedisKeyType.MAP_ID_REVERSED.getKeyPrefix(clusterId), toServer, toId)
|
||||
.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] getMapDataKey(@NotNull String serverName, int mapId, @NotNull String clusterId) {
|
||||
return String.format("%s:%s:%s", RedisKeyType.MAP_DATA.getKeyPrefix(clusterId), serverName, mapId).getBytes(StandardCharsets.UTF_8);
|
||||
return String.format("%s:%s:%s", RedisKeyType.MAP_DATA.getKeyPrefix(clusterId), serverName, mapId)
|
||||
.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user