mirror of
https://github.com/WiIIiam278/HuskSync.git
synced 2025-12-26 18:19:10 +00:00
Use JedisPool instead of single Jedis connection
This commit is contained in:
@@ -17,7 +17,6 @@ import org.bukkit.scheduler.BukkitTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.logging.Level;
|
||||
|
||||
public final class HuskSyncBukkit extends JavaPlugin {
|
||||
@@ -32,6 +31,8 @@ public final class HuskSyncBukkit extends JavaPlugin {
|
||||
|
||||
public static BukkitDataCache bukkitCache;
|
||||
|
||||
public static BukkitRedisListener redisListener;
|
||||
|
||||
// Used for establishing a handshake with redis
|
||||
public static UUID serverUUID;
|
||||
|
||||
@@ -120,7 +121,8 @@ public final class HuskSyncBukkit extends JavaPlugin {
|
||||
getServer().getPluginManager().registerEvents(new BukkitEventListener(), this);
|
||||
|
||||
// Initialize the redis listener
|
||||
if (!new BukkitRedisListener().isActiveAndEnabled) {
|
||||
redisListener = new BukkitRedisListener();
|
||||
if (!redisListener.isActiveAndEnabled) {
|
||||
getPluginLoader().disablePlugin(this);
|
||||
getLogger().severe("Failed to initialize Redis; disabling HuskSync (" + getServer().getName() + ") v" + getDescription().getVersion());
|
||||
return;
|
||||
|
||||
@@ -26,6 +26,7 @@ public class BukkitRedisListener extends RedisListener {
|
||||
|
||||
// Initialize the listener on the bukkit server
|
||||
public BukkitRedisListener() {
|
||||
super();
|
||||
listen();
|
||||
}
|
||||
|
||||
|
||||
@@ -3,12 +3,12 @@ package me.william278.husksync;
|
||||
import me.william278.husksync.bungeecord.command.BungeeCommand;
|
||||
import me.william278.husksync.bungeecord.config.ConfigLoader;
|
||||
import me.william278.husksync.bungeecord.config.ConfigManager;
|
||||
import me.william278.husksync.proxy.data.DataManager;
|
||||
import me.william278.husksync.bungeecord.listener.BungeeEventListener;
|
||||
import me.william278.husksync.bungeecord.listener.BungeeRedisListener;
|
||||
import me.william278.husksync.migrator.MPDBMigrator;
|
||||
import me.william278.husksync.bungeecord.util.BungeeLogger;
|
||||
import me.william278.husksync.bungeecord.util.BungeeUpdateChecker;
|
||||
import me.william278.husksync.migrator.MPDBMigrator;
|
||||
import me.william278.husksync.proxy.data.DataManager;
|
||||
import me.william278.husksync.redis.RedisMessage;
|
||||
import me.william278.husksync.util.Logger;
|
||||
import net.byteflux.libby.BungeeLibraryManager;
|
||||
@@ -48,6 +48,8 @@ public final class HuskSyncBungeeCord extends Plugin {
|
||||
|
||||
public static MPDBMigrator mpdbMigrator;
|
||||
|
||||
public static BungeeRedisListener redisListener;
|
||||
|
||||
private Logger logger;
|
||||
|
||||
public Logger getBungeeLogger() {
|
||||
@@ -98,7 +100,8 @@ public final class HuskSyncBungeeCord extends Plugin {
|
||||
}
|
||||
|
||||
// Initialize the redis listener
|
||||
if (!new BungeeRedisListener().isActiveAndEnabled) {
|
||||
redisListener = new BungeeRedisListener();
|
||||
if (!redisListener.isActiveAndEnabled) {
|
||||
getBungeeLogger().severe("Failed to initialize Redis; HuskSync will now abort loading itself (" + getProxy().getName() + ") v" + getDescription().getVersion());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2,16 +2,16 @@ package me.william278.husksync.bungeecord.command;
|
||||
|
||||
import de.themoep.minedown.MineDown;
|
||||
import me.william278.husksync.HuskSyncBungeeCord;
|
||||
import me.william278.husksync.Server;
|
||||
import me.william278.husksync.bungeecord.util.BungeeUpdateChecker;
|
||||
import me.william278.husksync.proxy.command.HuskSyncCommand;
|
||||
import me.william278.husksync.util.MessageManager;
|
||||
import me.william278.husksync.PlayerData;
|
||||
import me.william278.husksync.Server;
|
||||
import me.william278.husksync.Settings;
|
||||
import me.william278.husksync.bungeecord.config.ConfigLoader;
|
||||
import me.william278.husksync.bungeecord.config.ConfigManager;
|
||||
import me.william278.husksync.bungeecord.util.BungeeUpdateChecker;
|
||||
import me.william278.husksync.migrator.MPDBMigrator;
|
||||
import me.william278.husksync.proxy.command.HuskSyncCommand;
|
||||
import me.william278.husksync.redis.RedisMessage;
|
||||
import me.william278.husksync.util.MessageManager;
|
||||
import net.md_5.bungee.api.CommandSender;
|
||||
import net.md_5.bungee.api.ProxyServer;
|
||||
import net.md_5.bungee.api.connection.ProxiedPlayer;
|
||||
@@ -301,7 +301,7 @@ public class BungeeCommand extends Command implements TabExecutor, HuskSyncComma
|
||||
HuskSyncBungeeCord.synchronisedServers)) {
|
||||
ProxyServer.getInstance().getScheduler().runAsync(plugin, () ->
|
||||
HuskSyncBungeeCord.mpdbMigrator.executeMigrationOperations(HuskSyncBungeeCord.dataManager,
|
||||
HuskSyncBungeeCord.synchronisedServers));
|
||||
HuskSyncBungeeCord.synchronisedServers, HuskSyncBungeeCord.redisListener));
|
||||
}
|
||||
}
|
||||
default -> sender.sendMessage(new MineDown("Error: Invalid argument for migration. Use \"husksync migrate\" to start the process").toComponent());
|
||||
|
||||
@@ -24,6 +24,7 @@ public class BungeeRedisListener extends RedisListener {
|
||||
|
||||
// Initialize the listener on the bungee
|
||||
public BungeeRedisListener() {
|
||||
super();
|
||||
listen();
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import me.william278.husksync.Settings;
|
||||
import me.william278.husksync.proxy.data.DataManager;
|
||||
import me.william278.husksync.proxy.data.sql.Database;
|
||||
import me.william278.husksync.proxy.data.sql.MySQL;
|
||||
import me.william278.husksync.redis.RedisListener;
|
||||
import me.william278.husksync.redis.RedisMessage;
|
||||
import me.william278.husksync.util.Logger;
|
||||
|
||||
@@ -95,7 +96,7 @@ public class MPDBMigrator {
|
||||
}
|
||||
|
||||
// Carry out the migration
|
||||
public void executeMigrationOperations(DataManager dataManager, HashSet<Server> synchronisedServers) {
|
||||
public void executeMigrationOperations(DataManager dataManager, HashSet<Server> synchronisedServers, RedisListener redisListener) {
|
||||
// Prepare the target database for insertion
|
||||
prepareTargetDatabase(dataManager);
|
||||
|
||||
@@ -109,7 +110,7 @@ public class MPDBMigrator {
|
||||
getExperienceData();
|
||||
|
||||
// Send the encoded data to the Bukkit servers for conversion
|
||||
sendEncodedData(synchronisedServers);
|
||||
sendEncodedData(synchronisedServers, redisListener);
|
||||
}
|
||||
|
||||
// Clear the new database out of current data
|
||||
@@ -200,7 +201,7 @@ public class MPDBMigrator {
|
||||
}
|
||||
}
|
||||
|
||||
private void sendEncodedData(HashSet<Server> synchronisedServers) {
|
||||
private void sendEncodedData(HashSet<Server> synchronisedServers, RedisListener redisListener) {
|
||||
for (Server processingServer : synchronisedServers) {
|
||||
if (processingServer.hasMySqlPlayerDataBridge()) {
|
||||
for (MPDBPlayerData data : mpdbPlayerData) {
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package me.william278.husksync.redis;
|
||||
|
||||
import me.william278.husksync.Settings;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisClientConfig;
|
||||
import redis.clients.jedis.JedisPubSub;
|
||||
import redis.clients.jedis.*;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -16,6 +14,18 @@ public abstract class RedisListener {
|
||||
*/
|
||||
public boolean isActiveAndEnabled;
|
||||
|
||||
/**
|
||||
* Pool of connections to the Redis server
|
||||
*/
|
||||
private static JedisPool jedisPool;
|
||||
|
||||
/**
|
||||
* Creates a new RedisListener and initialises the Redis connection
|
||||
*/
|
||||
public RedisListener() {
|
||||
jedisPool = new JedisPool(new JedisPoolConfig(), Settings.redisHost, Settings.redisPort);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle an incoming {@link RedisMessage}
|
||||
*
|
||||
@@ -31,20 +41,34 @@ public abstract class RedisListener {
|
||||
*/
|
||||
public abstract void log(Level level, String message);
|
||||
|
||||
/**
|
||||
* Fetch a connection to the Redis server from the JedisPool
|
||||
*
|
||||
* @return Jedis instance from the pool
|
||||
*/
|
||||
public static Jedis getJedisConnection() {
|
||||
return jedisPool.getResource();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the Redis listener
|
||||
*/
|
||||
public final void listen() {
|
||||
try (Jedis jedis = new Jedis(Settings.redisHost, Settings.redisPort)) {
|
||||
final String jedisPassword = Settings.redisPassword;
|
||||
jedis.connect();
|
||||
if (jedis.isConnected()) {
|
||||
final String jedisPassword = Settings.redisPassword;
|
||||
new Thread(() -> {
|
||||
try (Jedis jedis = getJedisConnection()) {
|
||||
if (!jedisPassword.equals("")) {
|
||||
jedis.auth(jedisPassword);
|
||||
}
|
||||
isActiveAndEnabled = true;
|
||||
log(Level.INFO, "Enabled Redis listener successfully!");
|
||||
new Thread(() -> jedis.subscribe(new JedisPubSub() {
|
||||
if (jedis.isConnected()) {
|
||||
isActiveAndEnabled = true;
|
||||
log(Level.INFO, "Enabled Redis listener successfully!");
|
||||
} else {
|
||||
isActiveAndEnabled = false;
|
||||
log(Level.SEVERE, "Connection to the Redis server could not be established, please check the credentials.");
|
||||
return;
|
||||
}
|
||||
jedis.subscribe(new JedisPubSub() {
|
||||
@Override
|
||||
public void onMessage(String channel, String message) {
|
||||
// Only accept messages to the HuskSync channel
|
||||
@@ -59,13 +83,11 @@ public abstract class RedisListener {
|
||||
log(Level.SEVERE, "Failed to deserialize message target");
|
||||
}
|
||||
}
|
||||
}, RedisMessage.REDIS_CHANNEL), "Redis Subscriber").start();
|
||||
} else {
|
||||
}, RedisMessage.REDIS_CHANNEL);
|
||||
} catch (JedisException | IllegalStateException e) {
|
||||
log(Level.SEVERE, "An exception occurred with the Jedis Subscriber!");
|
||||
isActiveAndEnabled = false;
|
||||
log(Level.SEVERE, "Failed to initialize the redis listener!");
|
||||
}
|
||||
} catch (JedisException e) {
|
||||
log(Level.SEVERE, "Failed to establish a connection to the Redis server!");
|
||||
}
|
||||
}, "Redis Subscriber").start();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +22,9 @@ public class RedisMessage {
|
||||
|
||||
/**
|
||||
* Create a new RedisMessage
|
||||
* @param type The type of the message
|
||||
* @param target Who will receive this message
|
||||
*
|
||||
* @param type The type of the message
|
||||
* @param target Who will receive this message
|
||||
* @param messageData The message data elements
|
||||
*/
|
||||
public RedisMessage(MessageType type, MessageTarget target, String... messageData) {
|
||||
@@ -38,6 +39,7 @@ public class RedisMessage {
|
||||
|
||||
/**
|
||||
* Get a new RedisMessage from an incoming message string
|
||||
*
|
||||
* @param messageString The message string to parse
|
||||
*/
|
||||
public RedisMessage(String messageString) throws IOException, ClassNotFoundException {
|
||||
@@ -49,6 +51,7 @@ public class RedisMessage {
|
||||
|
||||
/**
|
||||
* Returns the full, formatted message string with type, target & data
|
||||
*
|
||||
* @return The fully formatted message
|
||||
*/
|
||||
private String getFullMessage() throws IOException {
|
||||
@@ -61,21 +64,23 @@ public class RedisMessage {
|
||||
* Send the redis message
|
||||
*/
|
||||
public void send() throws IOException {
|
||||
try (Jedis publisher = new Jedis(Settings.redisHost, Settings.redisPort)) {
|
||||
final String jedisPassword = Settings.redisPassword;
|
||||
publisher.connect();
|
||||
if (!jedisPassword.equals("")) {
|
||||
publisher.auth(jedisPassword);
|
||||
}
|
||||
publisher.publish(REDIS_CHANNEL, getFullMessage());
|
||||
try (Jedis publisher = RedisListener.getJedisConnection()) {
|
||||
final String jedisPassword = Settings.redisPassword;
|
||||
publisher.connect();
|
||||
if (!jedisPassword.equals("")) {
|
||||
publisher.auth(jedisPassword);
|
||||
}
|
||||
publisher.publish(REDIS_CHANNEL, getFullMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public String getMessageData() {
|
||||
return messageData;
|
||||
}
|
||||
|
||||
public String[] getMessageDataElements() { return messageData.split(MESSAGE_DATA_SEPARATOR); }
|
||||
public String[] getMessageDataElements() {
|
||||
return messageData.split(MESSAGE_DATA_SEPARATOR);
|
||||
}
|
||||
|
||||
public MessageType getMessageType() {
|
||||
return messageType;
|
||||
@@ -173,7 +178,9 @@ public class RedisMessage {
|
||||
/**
|
||||
* A record that defines the target of a plugin message; a spigot server or the proxy server(s). For Bukkit servers, the name of the server must also be specified
|
||||
*/
|
||||
public record MessageTarget(Settings.ServerType targetServerType, UUID targetPlayerUUID, String targetClusterId) implements Serializable { }
|
||||
public record MessageTarget(Settings.ServerType targetServerType, UUID targetPlayerUUID,
|
||||
String targetClusterId) implements Serializable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize an object from a Base64 string
|
||||
|
||||
@@ -69,6 +69,8 @@ public class HuskSyncVelocity {
|
||||
|
||||
public static DataManager dataManager;
|
||||
|
||||
public static VelocityRedisListener redisListener;
|
||||
|
||||
public static MPDBMigrator mpdbMigrator;
|
||||
|
||||
private final Logger logger;
|
||||
@@ -146,7 +148,8 @@ public class HuskSyncVelocity {
|
||||
}
|
||||
|
||||
// Initialize the redis listener
|
||||
if (!new VelocityRedisListener().isActiveAndEnabled) {
|
||||
redisListener = new VelocityRedisListener();
|
||||
if (!redisListener.isActiveAndEnabled) {
|
||||
getVelocityLogger().severe("Failed to initialize Redis; HuskSync will now abort loading itself (Velocity) v" + VERSION);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ public class VelocityCommand implements SimpleCommand, HuskSyncCommand {
|
||||
HuskSyncVelocity.synchronisedServers)) {
|
||||
plugin.getProxyServer().getScheduler().buildTask(plugin, () ->
|
||||
HuskSyncVelocity.mpdbMigrator.executeMigrationOperations(HuskSyncVelocity.dataManager,
|
||||
HuskSyncVelocity.synchronisedServers)).schedule();
|
||||
HuskSyncVelocity.synchronisedServers, HuskSyncVelocity.redisListener)).schedule();
|
||||
}
|
||||
}
|
||||
default -> sender.sendMessage(new MineDown("Error: Invalid argument for migration. Use \"husksync migrate\" to start the process").toComponent());
|
||||
|
||||
@@ -23,6 +23,7 @@ public class VelocityRedisListener extends RedisListener {
|
||||
|
||||
// Initialize the listener on the bungee
|
||||
public VelocityRedisListener() {
|
||||
super();
|
||||
listen();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user