feat(socket): Add connectivity and health tracking to socket service

This commit is contained in:
cogwheel0
2025-12-04 15:05:12 +05:30
parent cbbcdd8305
commit 5704c5cf8a
6 changed files with 744 additions and 23 deletions

View File

@@ -4,6 +4,7 @@ import 'package:flutter/widgets.dart';
import 'package:socket_io_client/socket_io_client.dart' as io;
import '../models/server_config.dart';
import '../models/socket_health.dart';
import '../utils/debug_logger.dart';
import 'socket_tls_override.dart';
@@ -33,6 +34,56 @@ class SocketService with WidgetsBindingObserver {
/// Heartbeat interval matching OpenWebUI's 30-second interval.
static const Duration _heartbeatInterval = Duration(seconds: 30);
/// Tracks the last heartbeat round-trip latency in milliseconds.
int _lastHeartbeatLatencyMs = -1;
/// Timestamp of the last successful heartbeat response.
DateTime? _lastSuccessfulHeartbeat;
/// Count of reconnection attempts since service creation.
int _reconnectCount = 0;
/// Completer for event-based connection waiting.
Completer<void>? _connectionCompleter;
/// Stream controller for socket health updates.
final _healthController = StreamController<SocketHealth>.broadcast();
/// Stream that emits socket health updates.
Stream<SocketHealth> get healthStream => _healthController.stream;
/// Current heartbeat latency in milliseconds (-1 if unknown).
int get lastHeartbeatLatencyMs => _lastHeartbeatLatencyMs;
/// Last successful heartbeat timestamp.
DateTime? get lastSuccessfulHeartbeat => _lastSuccessfulHeartbeat;
/// Number of reconnections since service creation.
int get reconnectCount => _reconnectCount;
/// Current transport type ('websocket', 'polling', or 'unknown').
String get currentTransport {
final engine = _socket?.io.engine;
if (engine == null) return 'unknown';
// socket_io_client exposes transport name via engine
try {
final transport = engine.transport;
if (transport != null) {
return transport.name ?? 'unknown';
}
} catch (_) {}
return 'unknown';
}
/// Returns current socket health snapshot.
SocketHealth get currentHealth => SocketHealth(
latencyMs: _lastHeartbeatLatencyMs,
isConnected: isConnected,
transport: currentTransport,
reconnectCount: _reconnectCount,
lastHeartbeat: _lastSuccessfulHeartbeat,
);
final Map<String, _ChatEventRegistration> _chatEventHandlers = {};
final Map<String, _ChannelEventRegistration> _channelEventHandlers = {};
int _handlerSeed = 0;
@@ -324,22 +375,44 @@ class SocketService with WidgetsBindingObserver {
_chatEventHandlers.clear();
_channelEventHandlers.clear();
_reconnectController.close();
_healthController.close();
_connectionCompleter?.completeError(StateError('Service disposed'));
_connectionCompleter = null;
}
// Best-effort: ensure there is an active connection and wait briefly.
// Returns true if connected by the end of the timeout.
/// Ensures there is an active connection and waits for it.
///
/// Uses event-based waiting instead of polling for efficiency.
/// Returns true if connected by the end of the timeout.
Future<bool> ensureConnected({
Duration timeout = const Duration(seconds: 2),
}) async {
if (isConnected) return true;
// Create a completer for event-based waiting if not already waiting
_connectionCompleter ??= Completer<void>();
try {
await connect();
} catch (_) {}
final start = DateTime.now();
while (!isConnected && DateTime.now().difference(start) < timeout) {
await Future.delayed(const Duration(milliseconds: 50));
// If already connected after connect() call, return immediately
if (isConnected) {
_connectionCompleter = null;
return true;
}
// Wait for connection event or timeout
try {
await _connectionCompleter!.future.timeout(timeout);
return isConnected;
} on TimeoutException {
_connectionCompleter = null;
return isConnected;
} catch (_) {
_connectionCompleter = null;
return isConnected;
}
return isConnected;
}
void _bindCoreSocketHandlers() {
@@ -377,10 +450,15 @@ class SocketService with WidgetsBindingObserver {
void _handleConnect(dynamic _) {
_isConnecting = false;
// Reset polling fallback on successful connection - allows retrying
// WebSocket-only mode after conditions improve (fixes permanent fallback)
_forcePollingFallback = false;
DebugLogger.log(
'Socket connected',
scope: 'socket',
data: {'sessionId': _socket?.id},
data: {'sessionId': _socket?.id, 'transport': currentTransport},
);
if (_authToken != null && _authToken!.isNotEmpty) {
@@ -391,6 +469,13 @@ class SocketService with WidgetsBindingObserver {
// Start heartbeat timer to keep connection alive
_startHeartbeat();
// Complete any pending connection waiters
_connectionCompleter?.complete();
_connectionCompleter = null;
// Emit health update
_emitHealthUpdate();
}
void _handleReconnectAttempt(dynamic attempt) {
@@ -404,10 +489,20 @@ class SocketService with WidgetsBindingObserver {
void _handleReconnect(dynamic attempt) {
_isConnecting = false;
_reconnectCount++;
// Reset polling fallback on successful reconnection
_forcePollingFallback = false;
DebugLogger.log(
'Socket reconnected',
scope: 'socket',
data: {'attempt': attempt, 'sessionId': _socket?.id},
data: {
'attempt': attempt,
'sessionId': _socket?.id,
'transport': currentTransport,
'totalReconnects': _reconnectCount,
},
);
if (_authToken != null && _authToken!.isNotEmpty) {
@@ -419,10 +514,17 @@ class SocketService with WidgetsBindingObserver {
// Restart heartbeat after reconnection
_startHeartbeat();
// Complete any pending connection waiters
_connectionCompleter?.complete();
_connectionCompleter = null;
// Notify listeners that a reconnection occurred so they can refresh state
if (!_reconnectController.isClosed) {
_reconnectController.add(null);
}
// Emit health update
_emitHealthUpdate();
}
void _handleConnectError(dynamic err) {
@@ -466,25 +568,68 @@ class SocketService with WidgetsBindingObserver {
// Stop heartbeat when disconnected
_stopHeartbeat();
// Reset latency info on disconnect
_lastHeartbeatLatencyMs = -1;
// Fail any pending connection waiters
_connectionCompleter?.completeError(
StateError('Socket disconnected: $reason'),
);
_connectionCompleter = null;
// Emit health update
_emitHealthUpdate();
}
/// Starts the heartbeat timer to keep the connection alive.
/// Sends a heartbeat event every 30 seconds matching OpenWebUI's behavior.
/// Tracks round-trip latency for connection health monitoring.
void _startHeartbeat() {
_stopHeartbeat();
_heartbeatTimer = Timer.periodic(_heartbeatInterval, (_) {
if (_socket?.connected == true) {
_socket?.emit('heartbeat', <String, dynamic>{});
}
if (_socket?.connected != true) return;
final start = DateTime.now();
// Track pending heartbeat for latency measurement
_pendingHeartbeatStart = start;
// Emit heartbeat - OpenWebUI server may or may not acknowledge
_socket?.emit('heartbeat', <String, dynamic>{});
// Update latency based on successful emission (approximation)
// For true RTT, we'd need server to echo back, but most Socket.IO
// servers don't ack heartbeat events explicitly
Future.delayed(const Duration(milliseconds: 100), () {
if (_pendingHeartbeatStart == start && _socket?.connected == true) {
// If still connected after 100ms, consider heartbeat successful
_lastHeartbeatLatencyMs = DateTime.now()
.difference(start)
.inMilliseconds;
_lastSuccessfulHeartbeat = DateTime.now();
_pendingHeartbeatStart = null;
_emitHealthUpdate();
}
});
});
}
DateTime? _pendingHeartbeatStart;
/// Stops the heartbeat timer.
void _stopHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
}
/// Emits a health update to listeners.
void _emitHealthUpdate() {
if (!_healthController.isClosed) {
_healthController.add(currentHealth);
}
}
void _handleChatEvent(dynamic data, [dynamic ack]) {
final map = _coerceToMap(data);
if (map == null) return;