mirror of
https://github.com/WiIIiam278/HuskSync.git
synced 2025-12-26 01:59:20 +00:00
Use a continuous connection for pub/sub to avoid EndOfStreamException and increase exception logging verbosity
This commit is contained in:
@@ -2,6 +2,7 @@ package me.william278.husksync.redis;
|
||||
|
||||
import me.william278.husksync.Settings;
|
||||
import redis.clients.jedis.*;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -68,37 +69,53 @@ public abstract class RedisListener {
|
||||
*/
|
||||
public final void listen() {
|
||||
new Thread(() -> {
|
||||
try (Jedis jedis = getJedisConnection()) {
|
||||
if (jedis.isConnected()) {
|
||||
isActiveAndEnabled = true;
|
||||
log(Level.INFO, "Enabled Redis listener successfully!");
|
||||
} else {
|
||||
isActiveAndEnabled = false;
|
||||
log(Level.SEVERE, """
|
||||
Failed to establish connection to the Redis server.
|
||||
HuskSync will now abort initialization.
|
||||
Please check the credentials are correct and restart your server.""");
|
||||
return;
|
||||
}
|
||||
jedis.subscribe(new JedisPubSub() {
|
||||
@Override
|
||||
public void onMessage(String channel, String message) {
|
||||
// Only accept messages to the HuskSync channel
|
||||
if (!channel.equals(RedisMessage.REDIS_CHANNEL)) {
|
||||
return;
|
||||
}
|
||||
isActiveAndEnabled = true;
|
||||
while (isActiveAndEnabled) {
|
||||
|
||||
// Handle the message
|
||||
try {
|
||||
handleMessage(new RedisMessage(message));
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
log(Level.SEVERE, "Failed to deserialize message target");
|
||||
Jedis subscriber;
|
||||
if (Settings.redisPassword.isEmpty()) {
|
||||
subscriber = new Jedis(Settings.redisHost,
|
||||
Settings.redisPort,
|
||||
0);
|
||||
} else {
|
||||
final JedisClientConfig config = DefaultJedisClientConfig.builder()
|
||||
.password(Settings.redisPassword)
|
||||
.timeoutMillis(0).build();
|
||||
|
||||
subscriber = new Jedis(Settings.redisHost,
|
||||
Settings.redisPort,
|
||||
config);
|
||||
}
|
||||
subscriber.connect();
|
||||
|
||||
log(Level.INFO, "Enabled Redis listener successfully!");
|
||||
try {
|
||||
subscriber.subscribe(new JedisPubSub() {
|
||||
@Override
|
||||
public void onMessage(String channel, String message) {
|
||||
// Only accept messages to the HuskSync channel
|
||||
if (!channel.equals(RedisMessage.REDIS_CHANNEL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle the message
|
||||
try {
|
||||
handleMessage(new RedisMessage(message));
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
log(Level.SEVERE, "Failed to deserialize message target");
|
||||
}
|
||||
}
|
||||
}
|
||||
}, RedisMessage.REDIS_CHANNEL);
|
||||
} catch (JedisException | IllegalStateException e) {
|
||||
log(Level.SEVERE, "An exception occurred with the Jedis Subscriber!");
|
||||
isActiveAndEnabled = false;
|
||||
}, RedisMessage.REDIS_CHANNEL);
|
||||
} catch (JedisConnectionException connectionException) {
|
||||
log(Level.SEVERE, "A connection exception occurred with the Jedis listener");
|
||||
connectionException.printStackTrace();
|
||||
} catch (JedisException jedisException) {
|
||||
isActiveAndEnabled = false;
|
||||
log(Level.SEVERE, "An exception occurred with the Jedis listener");
|
||||
jedisException.printStackTrace();
|
||||
} finally {
|
||||
subscriber.close();
|
||||
}
|
||||
}
|
||||
}, "Redis Subscriber").start();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user