Merge pull request #215 from cogwheel0/socket-connection-reliability-improvements

socket-connection-reliability-improvements
This commit is contained in:
cogwheel
2025-12-03 14:09:08 +05:30
committed by GitHub
2 changed files with 70 additions and 25 deletions

View File

@@ -329,6 +329,7 @@ final apiServiceProvider = Provider<ApiService?>((ref) {
class SocketServiceManager extends _$SocketServiceManager { class SocketServiceManager extends _$SocketServiceManager {
SocketService? _service; SocketService? _service;
ProviderSubscription<String?>? _tokenSubscription; ProviderSubscription<String?>? _tokenSubscription;
int _connectToken = 0;
@override @override
FutureOr<SocketService?> build() async { FutureOr<SocketService?> build() async {
@@ -390,9 +391,11 @@ class SocketServiceManager extends _$SocketServiceManager {
} }
void _scheduleConnect(SocketService service) { void _scheduleConnect(SocketService service) {
final token = ++_connectToken;
WidgetsBinding.instance.addPostFrameCallback((_) async { WidgetsBinding.instance.addPostFrameCallback((_) async {
await Future.delayed(const Duration(milliseconds: 150));
if (!ref.mounted) return; if (!ref.mounted) return;
if (_connectToken != token) return;
if (!identical(_service, service)) return;
try { try {
unawaited(service.connect()); unawaited(service.connect());
} catch (_) {} } catch (_) {}
@@ -400,6 +403,7 @@ class SocketServiceManager extends _$SocketServiceManager {
} }
void _disposeService() { void _disposeService() {
_connectToken++;
if (_service == null) return; if (_service == null) return;
try { try {
_service!.dispose(); _service!.dispose();

View File

@@ -25,8 +25,10 @@ class SocketService with WidgetsBindingObserver {
final bool allowWebsocketUpgrade; final bool allowWebsocketUpgrade;
io.Socket? _socket; io.Socket? _socket;
String? _authToken; String? _authToken;
bool _isConnecting = false;
bool _isAppForeground = true; bool _isAppForeground = true;
Timer? _heartbeatTimer; Timer? _heartbeatTimer;
bool _forcePollingFallback = false;
/// Heartbeat interval matching OpenWebUI's 30-second interval. /// Heartbeat interval matching OpenWebUI's 30-second interval.
static const Duration _heartbeatInterval = Duration(seconds: 30); static const Duration _heartbeatInterval = Duration(seconds: 30);
@@ -69,6 +71,9 @@ class SocketService with WidgetsBindingObserver {
Future<void> connect({bool force = false}) async { Future<void> connect({bool force = false}) async {
if (_socket != null && _socket!.connected && !force) return; if (_socket != null && _socket!.connected && !force) return;
if (_isConnecting && !force) return;
_isConnecting = true;
DebugLogger.log( DebugLogger.log(
'Connecting to socket', 'Connecting to socket',
@@ -80,7 +85,11 @@ class SocketService with WidgetsBindingObserver {
_stopHeartbeat(); _stopHeartbeat();
try { try {
_socket?.dispose(); final existing = _socket;
if (existing != null) {
_unbindCoreSocketHandlers(existing);
existing.dispose();
}
} catch (_) {} } catch (_) {}
String base = serverConfig.url.replaceFirst(RegExp(r'/+$'), ''); String base = serverConfig.url.replaceFirst(RegExp(r'/+$'), '');
@@ -94,8 +103,10 @@ class SocketService with WidgetsBindingObserver {
} catch (_) {} } catch (_) {}
final path = '/ws/socket.io'; final path = '/ws/socket.io';
final usePollingOnly = !websocketOnly && !allowWebsocketUpgrade; final usePollingFallback = _forcePollingFallback;
final transports = websocketOnly final effectiveWebsocketOnly = websocketOnly && !usePollingFallback;
final usePollingOnly = !effectiveWebsocketOnly && !allowWebsocketUpgrade;
final transports = effectiveWebsocketOnly
? const ['websocket'] ? const ['websocket']
: usePollingOnly : usePollingOnly
? const ['polling'] ? const ['polling']
@@ -104,8 +115,8 @@ class SocketService with WidgetsBindingObserver {
final builder = io.OptionBuilder() final builder = io.OptionBuilder()
// Transport selection switches between WebSocket-only and polling fallback // Transport selection switches between WebSocket-only and polling fallback
.setTransports(transports) .setTransports(transports)
.setRememberUpgrade(!websocketOnly && allowWebsocketUpgrade) .setRememberUpgrade(!effectiveWebsocketOnly && allowWebsocketUpgrade)
.setUpgrade(!websocketOnly && allowWebsocketUpgrade) .setUpgrade(!effectiveWebsocketOnly && allowWebsocketUpgrade)
// Tune reconnect/backoff and timeouts // Tune reconnect/backoff and timeouts
// Note: In socket_io_client, pass a very large number for "unlimited" attempts. // Note: In socket_io_client, pass a very large number for "unlimited" attempts.
// Using double.maxFinite.toInt() ensures unlimited reconnection attempts. // Using double.maxFinite.toInt() ensures unlimited reconnection attempts.
@@ -154,13 +165,17 @@ class SocketService with WidgetsBindingObserver {
builder.setExtraHeaders(extraHeaders); builder.setExtraHeaders(extraHeaders);
} }
_socket = createSocketWithOptionalBadCertOverride( try {
base, _socket = createSocketWithOptionalBadCertOverride(
builder, base,
serverConfig, builder,
); serverConfig,
);
_bindCoreSocketHandlers(); _bindCoreSocketHandlers();
} catch (_) {
_isConnecting = false;
rethrow;
}
} }
/// Update the auth token used by the socket service. /// Update the auth token used by the socket service.
@@ -298,7 +313,11 @@ class SocketService with WidgetsBindingObserver {
void dispose() { void dispose() {
_stopHeartbeat(); _stopHeartbeat();
try { try {
_socket?.dispose(); final existing = _socket;
if (existing != null) {
_unbindCoreSocketHandlers(existing);
existing.dispose();
}
} catch (_) {} } catch (_) {}
_socket = null; _socket = null;
WidgetsBinding.instance.removeObserver(this); WidgetsBinding.instance.removeObserver(this);
@@ -327,17 +346,7 @@ class SocketService with WidgetsBindingObserver {
final socket = _socket; final socket = _socket;
if (socket == null) return; if (socket == null) return;
socket _unbindCoreSocketHandlers(socket);
..off('events', _handleChatEvent)
..off('chat-events', _handleChatEvent)
..off('events:channel', _handleChannelEvent)
..off('channel-events', _handleChannelEvent)
..off('connect', _handleConnect)
..off('connect_error', _handleConnectError)
..off('reconnect_attempt', _handleReconnectAttempt)
..off('reconnect', _handleReconnect)
..off('reconnect_failed', _handleReconnectFailed)
..off('disconnect', _handleDisconnect);
socket socket
..on('events', _handleChatEvent) ..on('events', _handleChatEvent)
@@ -352,7 +361,22 @@ class SocketService with WidgetsBindingObserver {
..on('disconnect', _handleDisconnect); ..on('disconnect', _handleDisconnect);
} }
void _unbindCoreSocketHandlers(io.Socket socket) {
socket
..off('events', _handleChatEvent)
..off('chat-events', _handleChatEvent)
..off('events:channel', _handleChannelEvent)
..off('channel-events', _handleChannelEvent)
..off('connect', _handleConnect)
..off('connect_error', _handleConnectError)
..off('reconnect_attempt', _handleReconnectAttempt)
..off('reconnect', _handleReconnect)
..off('reconnect_failed', _handleReconnectFailed)
..off('disconnect', _handleDisconnect);
}
void _handleConnect(dynamic _) { void _handleConnect(dynamic _) {
_isConnecting = false;
DebugLogger.log( DebugLogger.log(
'Socket connected', 'Socket connected',
scope: 'socket', scope: 'socket',
@@ -370,6 +394,7 @@ class SocketService with WidgetsBindingObserver {
} }
void _handleReconnectAttempt(dynamic attempt) { void _handleReconnectAttempt(dynamic attempt) {
_isConnecting = true;
DebugLogger.log( DebugLogger.log(
'Socket reconnection attempt', 'Socket reconnection attempt',
scope: 'socket', scope: 'socket',
@@ -378,6 +403,7 @@ class SocketService with WidgetsBindingObserver {
} }
void _handleReconnect(dynamic attempt) { void _handleReconnect(dynamic attempt) {
_isConnecting = false;
DebugLogger.log( DebugLogger.log(
'Socket reconnected', 'Socket reconnected',
scope: 'socket', scope: 'socket',
@@ -400,15 +426,29 @@ class SocketService with WidgetsBindingObserver {
} }
void _handleConnectError(dynamic err) { void _handleConnectError(dynamic err) {
_isConnecting = false;
DebugLogger.error( DebugLogger.error(
'Socket connection error', 'Socket connection error',
scope: 'socket', scope: 'socket',
error: err, error: err,
data: {'serverUrl': serverConfig.url}, data: {'serverUrl': serverConfig.url},
); );
// If WebSocket-only handshake fails, retry once with polling+websocket
// transports to avoid endless spinners (issue #172).
if (websocketOnly && !_forcePollingFallback) {
_forcePollingFallback = true;
DebugLogger.warning(
'WebSocket connect failed; retrying with polling fallback',
scope: 'socket',
data: {'reason': err?.toString()},
);
unawaited(connect(force: true));
}
} }
void _handleReconnectFailed(dynamic _) { void _handleReconnectFailed(dynamic _) {
_isConnecting = false;
DebugLogger.error( DebugLogger.error(
'Socket reconnection failed after all attempts', 'Socket reconnection failed after all attempts',
scope: 'socket', scope: 'socket',
@@ -417,6 +457,7 @@ class SocketService with WidgetsBindingObserver {
} }
void _handleDisconnect(dynamic reason) { void _handleDisconnect(dynamic reason) {
_isConnecting = false;
DebugLogger.warning( DebugLogger.warning(
'Socket disconnected', 'Socket disconnected',
scope: 'socket', scope: 'socket',