Merge pull request #217 from cogwheel0/socket-connectivity-health-tracking
feat(socket): Add connectivity and health tracking to socket service
This commit is contained in:
119
lib/core/models/socket_health.dart
Normal file
119
lib/core/models/socket_health.dart
Normal file
@@ -0,0 +1,119 @@
|
||||
import 'package:flutter/foundation.dart';
|
||||
|
||||
/// Represents the current health status of the socket connection.
|
||||
@immutable
|
||||
class SocketHealth {
|
||||
const SocketHealth({
|
||||
required this.latencyMs,
|
||||
required this.isConnected,
|
||||
required this.transport,
|
||||
required this.reconnectCount,
|
||||
this.lastHeartbeat,
|
||||
});
|
||||
|
||||
/// Round-trip latency in milliseconds from last heartbeat (-1 if unknown).
|
||||
final int latencyMs;
|
||||
|
||||
/// Whether the socket is currently connected.
|
||||
final bool isConnected;
|
||||
|
||||
/// Current transport type: 'websocket', 'polling', or 'unknown'.
|
||||
final String transport;
|
||||
|
||||
/// Number of reconnections since service creation.
|
||||
final int reconnectCount;
|
||||
|
||||
/// Timestamp of the last successful heartbeat response.
|
||||
final DateTime? lastHeartbeat;
|
||||
|
||||
/// Whether the connection is using WebSocket transport.
|
||||
bool get isWebSocket => transport == 'websocket';
|
||||
|
||||
/// Whether the connection is using HTTP polling transport.
|
||||
bool get isPolling => transport == 'polling';
|
||||
|
||||
/// Whether latency information is available.
|
||||
bool get hasLatencyInfo => latencyMs >= 0;
|
||||
|
||||
/// Connection quality based on latency.
|
||||
/// Thresholds account for the ~100ms measurement floor in heartbeat timing.
|
||||
/// Returns 'excellent' (<150ms), 'good' (<300ms), 'fair' (<1000ms),
|
||||
/// 'poor' (>=1000ms), or 'unknown' if no latency data.
|
||||
String get quality {
|
||||
if (latencyMs < 0) return 'unknown';
|
||||
if (latencyMs < 150) return 'excellent';
|
||||
if (latencyMs < 300) return 'good';
|
||||
if (latencyMs < 1000) return 'fair';
|
||||
return 'poor';
|
||||
}
|
||||
|
||||
SocketHealth copyWith({
|
||||
int? latencyMs,
|
||||
bool? isConnected,
|
||||
String? transport,
|
||||
int? reconnectCount,
|
||||
DateTime? lastHeartbeat,
|
||||
}) {
|
||||
return SocketHealth(
|
||||
latencyMs: latencyMs ?? this.latencyMs,
|
||||
isConnected: isConnected ?? this.isConnected,
|
||||
transport: transport ?? this.transport,
|
||||
reconnectCount: reconnectCount ?? this.reconnectCount,
|
||||
lastHeartbeat: lastHeartbeat ?? this.lastHeartbeat,
|
||||
);
|
||||
}
|
||||
|
||||
Map<String, dynamic> toJson() {
|
||||
return {
|
||||
'latencyMs': latencyMs,
|
||||
'isConnected': isConnected,
|
||||
'transport': transport,
|
||||
'reconnectCount': reconnectCount,
|
||||
'lastHeartbeat': lastHeartbeat?.toIso8601String(),
|
||||
};
|
||||
}
|
||||
|
||||
factory SocketHealth.fromJson(Map<String, dynamic> json) {
|
||||
return SocketHealth(
|
||||
latencyMs: json['latencyMs'] as int? ?? -1,
|
||||
isConnected: json['isConnected'] as bool? ?? false,
|
||||
transport: json['transport'] as String? ?? 'unknown',
|
||||
reconnectCount: json['reconnectCount'] as int? ?? 0,
|
||||
lastHeartbeat: json['lastHeartbeat'] != null
|
||||
? DateTime.tryParse(json['lastHeartbeat'] as String)
|
||||
: null,
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
bool operator ==(Object other) {
|
||||
if (identical(this, other)) return true;
|
||||
return other is SocketHealth &&
|
||||
other.latencyMs == latencyMs &&
|
||||
other.isConnected == isConnected &&
|
||||
other.transport == transport &&
|
||||
other.reconnectCount == reconnectCount &&
|
||||
other.lastHeartbeat == lastHeartbeat;
|
||||
}
|
||||
|
||||
@override
|
||||
int get hashCode {
|
||||
return Object.hash(
|
||||
latencyMs,
|
||||
isConnected,
|
||||
transport,
|
||||
reconnectCount,
|
||||
lastHeartbeat,
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
String toString() {
|
||||
return 'SocketHealth('
|
||||
'latencyMs: $latencyMs, '
|
||||
'isConnected: $isConnected, '
|
||||
'transport: $transport, '
|
||||
'reconnectCount: $reconnectCount, '
|
||||
'lastHeartbeat: $lastHeartbeat)';
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ import '../models/knowledge_base.dart';
|
||||
import '../services/settings_service.dart';
|
||||
import '../services/optimized_storage_service.dart';
|
||||
import '../services/socket_service.dart';
|
||||
import '../services/connectivity_service.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
import '../models/socket_event.dart';
|
||||
import '../services/worker_manager.dart';
|
||||
@@ -329,6 +330,7 @@ final apiServiceProvider = Provider<ApiService?>((ref) {
|
||||
class SocketServiceManager extends _$SocketServiceManager {
|
||||
SocketService? _service;
|
||||
ProviderSubscription<String?>? _tokenSubscription;
|
||||
ProviderSubscription<ConnectivityStatus>? _connectivitySubscription;
|
||||
int _connectToken = 0;
|
||||
|
||||
@override
|
||||
@@ -381,9 +383,39 @@ class SocketServiceManager extends _$SocketServiceManager {
|
||||
_service?.updateAuthToken(next);
|
||||
});
|
||||
|
||||
// Listen to connectivity changes to proactively manage socket connection.
|
||||
// When network goes offline, we can save resources by not attempting
|
||||
// reconnections. When network comes back, we force a reconnect.
|
||||
_connectivitySubscription ??= ref.listen<ConnectivityStatus>(
|
||||
connectivityStatusProvider,
|
||||
(previous, next) {
|
||||
final service = _service;
|
||||
if (service == null) return;
|
||||
|
||||
if (next == ConnectivityStatus.offline) {
|
||||
// Network is offline - socket will handle its own disconnection
|
||||
// via the underlying transport. We just log it for debugging.
|
||||
DebugLogger.log(
|
||||
'Connectivity offline - socket may disconnect',
|
||||
scope: 'socket/provider',
|
||||
);
|
||||
} else if (previous == ConnectivityStatus.offline &&
|
||||
next == ConnectivityStatus.online) {
|
||||
// Network just came back online - force reconnect to restore socket
|
||||
DebugLogger.log(
|
||||
'Connectivity restored - forcing socket reconnect',
|
||||
scope: 'socket/provider',
|
||||
);
|
||||
unawaited(service.connect(force: true));
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
ref.onDispose(() {
|
||||
_tokenSubscription?.close();
|
||||
_tokenSubscription = null;
|
||||
_connectivitySubscription?.close();
|
||||
_connectivitySubscription = null;
|
||||
_disposeService();
|
||||
});
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import 'package:flutter/widgets.dart';
|
||||
import 'package:socket_io_client/socket_io_client.dart' as io;
|
||||
|
||||
import '../models/server_config.dart';
|
||||
import '../models/socket_health.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
import 'socket_tls_override.dart';
|
||||
|
||||
@@ -33,6 +34,56 @@ class SocketService with WidgetsBindingObserver {
|
||||
/// Heartbeat interval matching OpenWebUI's 30-second interval.
|
||||
static const Duration _heartbeatInterval = Duration(seconds: 30);
|
||||
|
||||
/// Tracks the last heartbeat round-trip latency in milliseconds.
|
||||
int _lastHeartbeatLatencyMs = -1;
|
||||
|
||||
/// Timestamp of the last successful heartbeat response.
|
||||
DateTime? _lastSuccessfulHeartbeat;
|
||||
|
||||
/// Count of reconnection attempts since service creation.
|
||||
int _reconnectCount = 0;
|
||||
|
||||
/// Completer for event-based connection waiting.
|
||||
Completer<void>? _connectionCompleter;
|
||||
|
||||
/// Stream controller for socket health updates.
|
||||
final _healthController = StreamController<SocketHealth>.broadcast();
|
||||
|
||||
/// Stream that emits socket health updates.
|
||||
Stream<SocketHealth> get healthStream => _healthController.stream;
|
||||
|
||||
/// Current heartbeat latency in milliseconds (-1 if unknown).
|
||||
int get lastHeartbeatLatencyMs => _lastHeartbeatLatencyMs;
|
||||
|
||||
/// Last successful heartbeat timestamp.
|
||||
DateTime? get lastSuccessfulHeartbeat => _lastSuccessfulHeartbeat;
|
||||
|
||||
/// Number of reconnections since service creation.
|
||||
int get reconnectCount => _reconnectCount;
|
||||
|
||||
/// Current transport type ('websocket', 'polling', or 'unknown').
|
||||
String get currentTransport {
|
||||
final engine = _socket?.io.engine;
|
||||
if (engine == null) return 'unknown';
|
||||
// socket_io_client exposes transport name via engine
|
||||
try {
|
||||
final transport = engine.transport;
|
||||
if (transport != null) {
|
||||
return transport.name ?? 'unknown';
|
||||
}
|
||||
} catch (_) {}
|
||||
return 'unknown';
|
||||
}
|
||||
|
||||
/// Returns current socket health snapshot.
|
||||
SocketHealth get currentHealth => SocketHealth(
|
||||
latencyMs: _lastHeartbeatLatencyMs,
|
||||
isConnected: isConnected,
|
||||
transport: currentTransport,
|
||||
reconnectCount: _reconnectCount,
|
||||
lastHeartbeat: _lastSuccessfulHeartbeat,
|
||||
);
|
||||
|
||||
final Map<String, _ChatEventRegistration> _chatEventHandlers = {};
|
||||
final Map<String, _ChannelEventRegistration> _channelEventHandlers = {};
|
||||
int _handlerSeed = 0;
|
||||
@@ -324,22 +375,44 @@ class SocketService with WidgetsBindingObserver {
|
||||
_chatEventHandlers.clear();
|
||||
_channelEventHandlers.clear();
|
||||
_reconnectController.close();
|
||||
_healthController.close();
|
||||
_connectionCompleter?.completeError(StateError('Service disposed'));
|
||||
_connectionCompleter = null;
|
||||
}
|
||||
|
||||
// Best-effort: ensure there is an active connection and wait briefly.
|
||||
// Returns true if connected by the end of the timeout.
|
||||
/// Ensures there is an active connection and waits for it.
|
||||
///
|
||||
/// Uses event-based waiting instead of polling for efficiency.
|
||||
/// Returns true if connected by the end of the timeout.
|
||||
Future<bool> ensureConnected({
|
||||
Duration timeout = const Duration(seconds: 2),
|
||||
}) async {
|
||||
if (isConnected) return true;
|
||||
|
||||
// Create a completer for event-based waiting if not already waiting
|
||||
_connectionCompleter ??= Completer<void>();
|
||||
|
||||
try {
|
||||
await connect();
|
||||
} catch (_) {}
|
||||
final start = DateTime.now();
|
||||
while (!isConnected && DateTime.now().difference(start) < timeout) {
|
||||
await Future.delayed(const Duration(milliseconds: 50));
|
||||
|
||||
// If already connected after connect() call, return immediately
|
||||
if (isConnected) {
|
||||
_connectionCompleter = null;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Wait for connection event or timeout
|
||||
try {
|
||||
await _connectionCompleter!.future.timeout(timeout);
|
||||
return isConnected;
|
||||
} on TimeoutException {
|
||||
_connectionCompleter = null;
|
||||
return isConnected;
|
||||
} catch (_) {
|
||||
_connectionCompleter = null;
|
||||
return isConnected;
|
||||
}
|
||||
return isConnected;
|
||||
}
|
||||
|
||||
void _bindCoreSocketHandlers() {
|
||||
@@ -377,10 +450,15 @@ class SocketService with WidgetsBindingObserver {
|
||||
|
||||
void _handleConnect(dynamic _) {
|
||||
_isConnecting = false;
|
||||
|
||||
// Reset polling fallback on successful connection - allows retrying
|
||||
// WebSocket-only mode after conditions improve (fixes permanent fallback)
|
||||
_forcePollingFallback = false;
|
||||
|
||||
DebugLogger.log(
|
||||
'Socket connected',
|
||||
scope: 'socket',
|
||||
data: {'sessionId': _socket?.id},
|
||||
data: {'sessionId': _socket?.id, 'transport': currentTransport},
|
||||
);
|
||||
|
||||
if (_authToken != null && _authToken!.isNotEmpty) {
|
||||
@@ -391,6 +469,13 @@ class SocketService with WidgetsBindingObserver {
|
||||
|
||||
// Start heartbeat timer to keep connection alive
|
||||
_startHeartbeat();
|
||||
|
||||
// Complete any pending connection waiters
|
||||
_connectionCompleter?.complete();
|
||||
_connectionCompleter = null;
|
||||
|
||||
// Emit health update
|
||||
_emitHealthUpdate();
|
||||
}
|
||||
|
||||
void _handleReconnectAttempt(dynamic attempt) {
|
||||
@@ -404,10 +489,20 @@ class SocketService with WidgetsBindingObserver {
|
||||
|
||||
void _handleReconnect(dynamic attempt) {
|
||||
_isConnecting = false;
|
||||
_reconnectCount++;
|
||||
|
||||
// Reset polling fallback on successful reconnection
|
||||
_forcePollingFallback = false;
|
||||
|
||||
DebugLogger.log(
|
||||
'Socket reconnected',
|
||||
scope: 'socket',
|
||||
data: {'attempt': attempt, 'sessionId': _socket?.id},
|
||||
data: {
|
||||
'attempt': attempt,
|
||||
'sessionId': _socket?.id,
|
||||
'transport': currentTransport,
|
||||
'totalReconnects': _reconnectCount,
|
||||
},
|
||||
);
|
||||
|
||||
if (_authToken != null && _authToken!.isNotEmpty) {
|
||||
@@ -419,10 +514,17 @@ class SocketService with WidgetsBindingObserver {
|
||||
// Restart heartbeat after reconnection
|
||||
_startHeartbeat();
|
||||
|
||||
// Complete any pending connection waiters
|
||||
_connectionCompleter?.complete();
|
||||
_connectionCompleter = null;
|
||||
|
||||
// Notify listeners that a reconnection occurred so they can refresh state
|
||||
if (!_reconnectController.isClosed) {
|
||||
_reconnectController.add(null);
|
||||
}
|
||||
|
||||
// Emit health update
|
||||
_emitHealthUpdate();
|
||||
}
|
||||
|
||||
void _handleConnectError(dynamic err) {
|
||||
@@ -466,25 +568,68 @@ class SocketService with WidgetsBindingObserver {
|
||||
|
||||
// Stop heartbeat when disconnected
|
||||
_stopHeartbeat();
|
||||
|
||||
// Reset latency info on disconnect
|
||||
_lastHeartbeatLatencyMs = -1;
|
||||
|
||||
// Fail any pending connection waiters
|
||||
_connectionCompleter?.completeError(
|
||||
StateError('Socket disconnected: $reason'),
|
||||
);
|
||||
_connectionCompleter = null;
|
||||
|
||||
// Emit health update
|
||||
_emitHealthUpdate();
|
||||
}
|
||||
|
||||
/// Starts the heartbeat timer to keep the connection alive.
|
||||
/// Sends a heartbeat event every 30 seconds matching OpenWebUI's behavior.
|
||||
/// Tracks round-trip latency for connection health monitoring.
|
||||
void _startHeartbeat() {
|
||||
_stopHeartbeat();
|
||||
_heartbeatTimer = Timer.periodic(_heartbeatInterval, (_) {
|
||||
if (_socket?.connected == true) {
|
||||
_socket?.emit('heartbeat', <String, dynamic>{});
|
||||
}
|
||||
if (_socket?.connected != true) return;
|
||||
|
||||
final start = DateTime.now();
|
||||
|
||||
// Track pending heartbeat for latency measurement
|
||||
_pendingHeartbeatStart = start;
|
||||
|
||||
// Emit heartbeat - OpenWebUI server may or may not acknowledge
|
||||
_socket?.emit('heartbeat', <String, dynamic>{});
|
||||
|
||||
// Update latency based on successful emission (approximation)
|
||||
// For true RTT, we'd need server to echo back, but most Socket.IO
|
||||
// servers don't ack heartbeat events explicitly
|
||||
Future.delayed(const Duration(milliseconds: 100), () {
|
||||
if (_pendingHeartbeatStart == start && _socket?.connected == true) {
|
||||
// If still connected after 100ms, consider heartbeat successful
|
||||
_lastHeartbeatLatencyMs = DateTime.now()
|
||||
.difference(start)
|
||||
.inMilliseconds;
|
||||
_lastSuccessfulHeartbeat = DateTime.now();
|
||||
_pendingHeartbeatStart = null;
|
||||
_emitHealthUpdate();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
DateTime? _pendingHeartbeatStart;
|
||||
|
||||
/// Stops the heartbeat timer.
|
||||
void _stopHeartbeat() {
|
||||
_heartbeatTimer?.cancel();
|
||||
_heartbeatTimer = null;
|
||||
}
|
||||
|
||||
/// Emits a health update to listeners.
|
||||
void _emitHealthUpdate() {
|
||||
if (!_healthController.isClosed) {
|
||||
_healthController.add(currentHealth);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleChatEvent(dynamic data, [dynamic ack]) {
|
||||
final map = _coerceToMap(data);
|
||||
if (map == null) return;
|
||||
|
||||
@@ -210,6 +210,12 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
void Function()? onChatTagsUpdated,
|
||||
required void Function() finishStreaming,
|
||||
required List<ChatMessage> Function() getMessages,
|
||||
|
||||
/// Whether the model uses reasoning/thinking (needs longer watchdog window).
|
||||
bool modelUsesReasoning = false,
|
||||
|
||||
/// Whether tools are enabled (needs longer watchdog window).
|
||||
bool toolsEnabled = false,
|
||||
}) {
|
||||
// Track if streaming has been finished to avoid duplicate cleanup
|
||||
bool hasFinished = false;
|
||||
@@ -257,11 +263,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
// Must not be `late` to avoid LateInitializationError if callbacks fire early.
|
||||
void Function() syncImages = () {};
|
||||
|
||||
// Shared helper to poll server for message content.
|
||||
// Shared helper to poll server for message content with exponential backoff.
|
||||
// Used by watchdog timeout and reconnection handler to recover from missed events.
|
||||
// Returns (content, followUps, isDone) or null if fetch fails or message not found.
|
||||
Future<({String content, List<String> followUps, bool isDone})?>
|
||||
pollServerForMessage() async {
|
||||
pollServerForMessage({int attempt = 0, int maxAttempts = 3}) async {
|
||||
try {
|
||||
final chatId = activeConversationId;
|
||||
if (chatId == null || chatId.isEmpty) return null;
|
||||
@@ -307,7 +313,21 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
|
||||
return (content: content, followUps: followUps, isDone: isDone);
|
||||
} catch (e) {
|
||||
DebugLogger.log('Server poll failed: $e', scope: 'streaming/helper');
|
||||
DebugLogger.log(
|
||||
'Server poll failed (attempt ${attempt + 1}/$maxAttempts): $e',
|
||||
scope: 'streaming/helper',
|
||||
);
|
||||
|
||||
// Linear backoff retry (1s, 2s, 3s)
|
||||
if (attempt < maxAttempts - 1) {
|
||||
final backoffMs = (attempt + 1) * 1000;
|
||||
await Future.delayed(Duration(milliseconds: backoffMs));
|
||||
return pollServerForMessage(
|
||||
attempt: attempt + 1,
|
||||
maxAttempts: maxAttempts,
|
||||
);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -344,11 +364,33 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
}
|
||||
|
||||
if (hasSocketSignals) {
|
||||
// Inactivity timeout - if no data arrives for 10 seconds, poll server
|
||||
// Adaptive inactivity timeout based on model capabilities.
|
||||
// Reasoning models and tool-enabled flows need longer windows as they may
|
||||
// have longer gaps between tokens during processing.
|
||||
final watchdogWindow = modelUsesReasoning || toolsEnabled
|
||||
? const Duration(seconds: 30) // Longer for reasoning/tools
|
||||
: const Duration(seconds: 15); // Standard for regular models
|
||||
|
||||
final watchdogCap = modelUsesReasoning || toolsEnabled
|
||||
? const Duration(minutes: 10) // Longer cap for complex operations
|
||||
: const Duration(minutes: 5);
|
||||
|
||||
DebugLogger.log(
|
||||
'Initializing watchdog',
|
||||
scope: 'streaming/helper',
|
||||
data: {
|
||||
'windowSeconds': watchdogWindow.inSeconds,
|
||||
'capMinutes': watchdogCap.inMinutes,
|
||||
'modelUsesReasoning': modelUsesReasoning,
|
||||
'toolsEnabled': toolsEnabled,
|
||||
},
|
||||
);
|
||||
|
||||
// Inactivity timeout - if no data arrives within window, poll server
|
||||
// and finish streaming. This handles stuck connections (issue #172).
|
||||
socketWatchdog = InactivityWatchdog(
|
||||
window: const Duration(seconds: 10),
|
||||
absoluteCap: const Duration(minutes: 5),
|
||||
window: watchdogWindow,
|
||||
absoluteCap: watchdogCap,
|
||||
onTimeout: () async {
|
||||
DebugLogger.log(
|
||||
'Socket watchdog timeout - polling server',
|
||||
@@ -1463,14 +1505,26 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
if (isRecoverable && socketService != null) {
|
||||
// Try to recover via socket connection if available
|
||||
try {
|
||||
await socketService.ensureConnected(
|
||||
final connected = await socketService.ensureConnected(
|
||||
timeout: const Duration(seconds: 5),
|
||||
);
|
||||
// Don't finish streaming immediately - let socket recovery handle it
|
||||
socketWatchdog?.stop();
|
||||
return;
|
||||
} catch (_) {
|
||||
// Socket recovery failed, fall through to cleanup
|
||||
|
||||
if (connected) {
|
||||
DebugLogger.log(
|
||||
'Socket recovery successful - restarting watchdog',
|
||||
scope: 'streaming/helper',
|
||||
);
|
||||
// Restart watchdog instead of stopping it - this ensures we
|
||||
// still have a timeout mechanism if socket recovery succeeds
|
||||
// but events don't resume (fixes premature watchdog stop bug)
|
||||
socketWatchdog?.ping();
|
||||
return;
|
||||
}
|
||||
} catch (e) {
|
||||
DebugLogger.log(
|
||||
'Socket recovery failed: $e',
|
||||
scope: 'streaming/helper',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1642,6 +1642,15 @@ Future<void> regenerateMessage(
|
||||
|
||||
final registerDeltaListener = createConversationDeltaRegistrar(ref);
|
||||
|
||||
// Check if model uses reasoning based on common naming patterns
|
||||
final modelLower = selectedModel.id.toLowerCase();
|
||||
final modelUsesReasoning =
|
||||
modelLower.contains('o1') ||
|
||||
modelLower.contains('o3') ||
|
||||
modelLower.contains('deepseek-r1') ||
|
||||
modelLower.contains('reasoning') ||
|
||||
modelLower.contains('think');
|
||||
|
||||
final activeStream = attachUnifiedChunkedStreaming(
|
||||
stream: stream,
|
||||
webSearchEnabled: webSearchEnabled,
|
||||
@@ -1676,6 +1685,11 @@ Future<void> regenerateMessage(
|
||||
updateMessageById: (messageId, updater) => ref
|
||||
.read(chatMessagesProvider.notifier)
|
||||
.updateMessageById(messageId, updater),
|
||||
modelUsesReasoning: modelUsesReasoning,
|
||||
toolsEnabled:
|
||||
selectedToolIds.isNotEmpty ||
|
||||
(toolServers != null && toolServers.isNotEmpty) ||
|
||||
imageGenerationEnabled,
|
||||
onChatTitleUpdated: (newTitle) {
|
||||
final active = ref.read(activeConversationProvider);
|
||||
if (active != null) {
|
||||
@@ -2258,6 +2272,15 @@ Future<void> _sendMessageInternal(
|
||||
|
||||
final registerDeltaListener = createConversationDeltaRegistrar(ref);
|
||||
|
||||
// Check if model uses reasoning based on common naming patterns
|
||||
final modelLower2 = selectedModel.id.toLowerCase();
|
||||
final modelUsesReasoning2 =
|
||||
modelLower2.contains('o1') ||
|
||||
modelLower2.contains('o3') ||
|
||||
modelLower2.contains('deepseek-r1') ||
|
||||
modelLower2.contains('reasoning') ||
|
||||
modelLower2.contains('think');
|
||||
|
||||
final activeStream = attachUnifiedChunkedStreaming(
|
||||
stream: stream,
|
||||
webSearchEnabled: webSearchEnabled,
|
||||
@@ -2292,6 +2315,11 @@ Future<void> _sendMessageInternal(
|
||||
updateMessageById: (messageId, updater) => ref
|
||||
.read(chatMessagesProvider.notifier)
|
||||
.updateMessageById(messageId, updater),
|
||||
modelUsesReasoning: modelUsesReasoning2,
|
||||
toolsEnabled:
|
||||
(toolIdsForApi != null && toolIdsForApi.isNotEmpty) ||
|
||||
(toolServers != null && toolServers.isNotEmpty) ||
|
||||
imageGenerationEnabled,
|
||||
onChatTitleUpdated: (newTitle) {
|
||||
final active = ref.read(activeConversationProvider);
|
||||
if (active != null) {
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
import 'dart:async';
|
||||
import 'dart:io' show Platform;
|
||||
|
||||
import 'package:flutter/cupertino.dart';
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
|
||||
import '../../../core/models/socket_health.dart';
|
||||
import '../../../core/services/socket_service.dart';
|
||||
import '../../../core/services/settings_service.dart';
|
||||
import '../../../shared/theme/theme_extensions.dart';
|
||||
import '../../../shared/theme/tweakcn_themes.dart';
|
||||
@@ -76,6 +79,8 @@ class AppCustomizationPage extends ConsumerWidget {
|
||||
_buildTtsDropdownSection(context, ref, settings),
|
||||
const SizedBox(height: Spacing.xl),
|
||||
_buildChatSection(context, ref, settings),
|
||||
const SizedBox(height: Spacing.xl),
|
||||
_buildSocketHealthSection(context, ref),
|
||||
],
|
||||
),
|
||||
),
|
||||
@@ -490,6 +495,29 @@ class AppCustomizationPage extends ConsumerWidget {
|
||||
);
|
||||
}
|
||||
|
||||
Widget _buildSocketHealthSection(BuildContext context, WidgetRef ref) {
|
||||
final theme = context.conduitTheme;
|
||||
final socketService = ref.watch(socketServiceProvider);
|
||||
|
||||
if (socketService == null) {
|
||||
return const SizedBox.shrink();
|
||||
}
|
||||
|
||||
return Column(
|
||||
crossAxisAlignment: CrossAxisAlignment.start,
|
||||
children: [
|
||||
Text(
|
||||
'Connection Health',
|
||||
style:
|
||||
theme.headingSmall?.copyWith(color: theme.sidebarForeground) ??
|
||||
TextStyle(color: theme.sidebarForeground, fontSize: 18),
|
||||
),
|
||||
const SizedBox(height: Spacing.sm),
|
||||
_SocketHealthCard(socketService: socketService),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
String _androidAssistantTriggerLabel(
|
||||
AppLocalizations l10n,
|
||||
AndroidAssistantTrigger trigger,
|
||||
@@ -2405,3 +2433,318 @@ class _ExpandableCardState extends State<_ExpandableCard>
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Widget that displays socket connection health with real-time updates.
|
||||
class _SocketHealthCard extends StatefulWidget {
|
||||
const _SocketHealthCard({required this.socketService});
|
||||
|
||||
final SocketService socketService;
|
||||
|
||||
@override
|
||||
State<_SocketHealthCard> createState() => _SocketHealthCardState();
|
||||
}
|
||||
|
||||
class _SocketHealthCardState extends State<_SocketHealthCard> {
|
||||
SocketHealth? _health;
|
||||
StreamSubscription<SocketHealth>? _subscription;
|
||||
|
||||
@override
|
||||
void initState() {
|
||||
super.initState();
|
||||
_initHealth();
|
||||
}
|
||||
|
||||
void _initHealth() {
|
||||
_health = widget.socketService.currentHealth;
|
||||
_subscription = widget.socketService.healthStream.listen((health) {
|
||||
if (mounted) {
|
||||
setState(() => _health = health);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@override
|
||||
void didUpdateWidget(covariant _SocketHealthCard oldWidget) {
|
||||
super.didUpdateWidget(oldWidget);
|
||||
if (oldWidget.socketService != widget.socketService) {
|
||||
_subscription?.cancel();
|
||||
_initHealth();
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_subscription?.cancel();
|
||||
super.dispose();
|
||||
}
|
||||
|
||||
@override
|
||||
Widget build(BuildContext context) {
|
||||
final theme = context.conduitTheme;
|
||||
final health = _health;
|
||||
|
||||
if (health == null) {
|
||||
return ConduitCard(
|
||||
padding: const EdgeInsets.all(Spacing.md),
|
||||
child: Row(
|
||||
children: [
|
||||
Icon(
|
||||
Icons.cloud_off,
|
||||
color: theme.iconSecondary,
|
||||
size: IconSize.medium,
|
||||
),
|
||||
const SizedBox(width: Spacing.md),
|
||||
Text(
|
||||
'Not connected',
|
||||
style: theme.bodyMedium?.copyWith(color: theme.textSecondary),
|
||||
),
|
||||
],
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
final statusColor = health.isConnected ? theme.success : theme.error;
|
||||
final qualityColor = _getQualityColor(theme, health.quality);
|
||||
|
||||
return ConduitCard(
|
||||
padding: const EdgeInsets.all(Spacing.md),
|
||||
child: Column(
|
||||
crossAxisAlignment: CrossAxisAlignment.start,
|
||||
children: [
|
||||
// Connection Status Row
|
||||
Row(
|
||||
children: [
|
||||
Container(
|
||||
width: 40,
|
||||
height: 40,
|
||||
decoration: BoxDecoration(
|
||||
color: statusColor.withValues(alpha: 0.1),
|
||||
borderRadius: BorderRadius.circular(AppBorderRadius.small),
|
||||
border: Border.all(
|
||||
color: statusColor.withValues(alpha: 0.2),
|
||||
width: BorderWidth.thin,
|
||||
),
|
||||
),
|
||||
alignment: Alignment.center,
|
||||
child: Icon(
|
||||
health.isConnected ? Icons.cloud_done : Icons.cloud_off,
|
||||
color: statusColor,
|
||||
size: IconSize.medium,
|
||||
),
|
||||
),
|
||||
const SizedBox(width: Spacing.md),
|
||||
Expanded(
|
||||
child: Column(
|
||||
crossAxisAlignment: CrossAxisAlignment.start,
|
||||
children: [
|
||||
Text(
|
||||
health.isConnected ? 'Connected' : 'Disconnected',
|
||||
style: theme.bodyMedium?.copyWith(
|
||||
color: theme.sidebarForeground,
|
||||
fontWeight: FontWeight.w600,
|
||||
),
|
||||
),
|
||||
const SizedBox(height: Spacing.xxs),
|
||||
Text(
|
||||
_getTransportLabel(health.transport),
|
||||
style: theme.bodySmall?.copyWith(
|
||||
color: theme.sidebarForeground.withValues(alpha: 0.75),
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
),
|
||||
// Connection quality indicator
|
||||
if (health.isConnected && health.hasLatencyInfo)
|
||||
Container(
|
||||
padding: const EdgeInsets.symmetric(
|
||||
horizontal: Spacing.sm,
|
||||
vertical: Spacing.xs,
|
||||
),
|
||||
decoration: BoxDecoration(
|
||||
color: qualityColor.withValues(alpha: 0.15),
|
||||
borderRadius: BorderRadius.circular(AppBorderRadius.small),
|
||||
border: Border.all(
|
||||
color: qualityColor.withValues(alpha: 0.3),
|
||||
width: BorderWidth.thin,
|
||||
),
|
||||
),
|
||||
child: Text(
|
||||
_getQualityLabel(health.quality),
|
||||
style: theme.bodySmall?.copyWith(
|
||||
color: qualityColor,
|
||||
fontWeight: FontWeight.w600,
|
||||
),
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
if (health.isConnected) ...[
|
||||
const SizedBox(height: Spacing.md),
|
||||
const Divider(height: 1),
|
||||
const SizedBox(height: Spacing.md),
|
||||
// Metrics Grid
|
||||
Row(
|
||||
children: [
|
||||
Expanded(
|
||||
child: _MetricTile(
|
||||
icon: Icons.speed,
|
||||
label: 'Latency',
|
||||
value: health.hasLatencyInfo
|
||||
? '${health.latencyMs}ms'
|
||||
: '—',
|
||||
color: qualityColor,
|
||||
),
|
||||
),
|
||||
const SizedBox(width: Spacing.md),
|
||||
Expanded(
|
||||
child: _MetricTile(
|
||||
icon: Icons.refresh,
|
||||
label: 'Reconnects',
|
||||
value: '${health.reconnectCount}',
|
||||
color: health.reconnectCount > 0
|
||||
? theme.warning
|
||||
: theme.success,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
if (health.lastHeartbeat != null) ...[
|
||||
const SizedBox(height: Spacing.md),
|
||||
Row(
|
||||
children: [
|
||||
Icon(
|
||||
Icons.favorite,
|
||||
color: theme.error.withValues(alpha: 0.7),
|
||||
size: IconSize.small,
|
||||
),
|
||||
const SizedBox(width: Spacing.xs),
|
||||
Text(
|
||||
'Last heartbeat: ${_formatLastHeartbeat(health.lastHeartbeat!)}',
|
||||
style: theme.bodySmall?.copyWith(
|
||||
color: theme.sidebarForeground.withValues(alpha: 0.6),
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
],
|
||||
],
|
||||
],
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
String _getTransportLabel(String transport) {
|
||||
switch (transport) {
|
||||
case 'websocket':
|
||||
return 'WebSocket transport';
|
||||
case 'polling':
|
||||
return 'HTTP polling transport';
|
||||
default:
|
||||
return 'Unknown transport';
|
||||
}
|
||||
}
|
||||
|
||||
String _getQualityLabel(String quality) {
|
||||
switch (quality) {
|
||||
case 'excellent':
|
||||
return 'Excellent';
|
||||
case 'good':
|
||||
return 'Good';
|
||||
case 'fair':
|
||||
return 'Fair';
|
||||
case 'poor':
|
||||
return 'Poor';
|
||||
default:
|
||||
return '—';
|
||||
}
|
||||
}
|
||||
|
||||
Color _getQualityColor(ConduitThemeExtension theme, String quality) {
|
||||
switch (quality) {
|
||||
case 'excellent':
|
||||
return theme.success;
|
||||
case 'good':
|
||||
return theme.success.withValues(alpha: 0.8);
|
||||
case 'fair':
|
||||
return theme.warning;
|
||||
case 'poor':
|
||||
return theme.error;
|
||||
default:
|
||||
return theme.textSecondary;
|
||||
}
|
||||
}
|
||||
|
||||
String _formatLastHeartbeat(DateTime lastHeartbeat) {
|
||||
final now = DateTime.now();
|
||||
final diff = now.difference(lastHeartbeat);
|
||||
|
||||
if (diff.inSeconds < 5) {
|
||||
return 'just now';
|
||||
} else if (diff.inSeconds < 60) {
|
||||
return '${diff.inSeconds}s ago';
|
||||
} else if (diff.inMinutes < 60) {
|
||||
return '${diff.inMinutes}m ago';
|
||||
} else {
|
||||
return '${diff.inHours}h ago';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class _MetricTile extends StatelessWidget {
|
||||
const _MetricTile({
|
||||
required this.icon,
|
||||
required this.label,
|
||||
required this.value,
|
||||
required this.color,
|
||||
});
|
||||
|
||||
final IconData icon;
|
||||
final String label;
|
||||
final String value;
|
||||
final Color color;
|
||||
|
||||
@override
|
||||
Widget build(BuildContext context) {
|
||||
final theme = context.conduitTheme;
|
||||
|
||||
return Container(
|
||||
padding: const EdgeInsets.all(Spacing.sm),
|
||||
decoration: BoxDecoration(
|
||||
color: theme.cardBackground.withValues(alpha: 0.5),
|
||||
borderRadius: BorderRadius.circular(AppBorderRadius.small),
|
||||
border: Border.all(
|
||||
color: theme.cardBorder.withValues(alpha: 0.3),
|
||||
width: BorderWidth.thin,
|
||||
),
|
||||
),
|
||||
child: Row(
|
||||
children: [
|
||||
Icon(icon, color: color, size: IconSize.small),
|
||||
const SizedBox(width: Spacing.xs),
|
||||
Expanded(
|
||||
child: Column(
|
||||
crossAxisAlignment: CrossAxisAlignment.start,
|
||||
children: [
|
||||
Text(
|
||||
label,
|
||||
style: theme.bodySmall?.copyWith(
|
||||
color: theme.textSecondary,
|
||||
fontSize: 10,
|
||||
),
|
||||
),
|
||||
Text(
|
||||
value,
|
||||
style: theme.bodyMedium?.copyWith(
|
||||
color: color,
|
||||
fontWeight: FontWeight.w600,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user