From 30f1650faf434ef18ec8ebd95514b30da65835cb Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sun, 7 Sep 2025 23:17:26 +0530 Subject: [PATCH] feat: inactivity watchdog for sockets --- .../persistent_streaming_service.dart | 4 +- lib/core/utils/inactivity_watchdog.dart | 83 ++++++ .../chat/providers/chat_providers.dart | 265 +++++++++++------- 3 files changed, 242 insertions(+), 110 deletions(-) create mode 100644 lib/core/utils/inactivity_watchdog.dart diff --git a/lib/core/services/persistent_streaming_service.dart b/lib/core/services/persistent_streaming_service.dart index 58bdf34..63250e9 100644 --- a/lib/core/services/persistent_streaming_service.dart +++ b/lib/core/services/persistent_streaming_service.dart @@ -333,7 +333,9 @@ class PersistentStreamingService with WidgetsBindingObserver { final lastUpdate = metadata['lastUpdate'] as DateTime?; if (lastUpdate != null) { final timeSinceUpdate = DateTime.now().difference(lastUpdate); - return timeSinceUpdate > const Duration(minutes: 1); + // Align with app-side watchdogs: be less aggressive than UI guard + // but still attempt recovery before server timeouts become likely. + return timeSinceUpdate > const Duration(minutes: 2); } return false; diff --git a/lib/core/utils/inactivity_watchdog.dart b/lib/core/utils/inactivity_watchdog.dart new file mode 100644 index 0000000..00453ac --- /dev/null +++ b/lib/core/utils/inactivity_watchdog.dart @@ -0,0 +1,83 @@ +import 'dart:async'; + +/// A simple activity-based watchdog. +/// +/// Call [ping] whenever activity occurs. If no activity happens +/// within [window], [onTimeout] fires. Optionally, an [absoluteCap] +/// enforces a maximum total duration regardless of activity. +class InactivityWatchdog { + InactivityWatchdog({ + required Duration window, + required this.onTimeout, + Duration? absoluteCap, + }) : _window = window, + _absoluteCap = absoluteCap; + + final void Function() onTimeout; + + Duration _window; + Duration? _absoluteCap; + Timer? _timer; + Timer? _absoluteTimer; + bool _started = false; + + Duration get window => _window; + + void setWindow(Duration newWindow) { + _window = newWindow; + if (_started) { + // Restart timer with new window + _restart(); + } + } + + void setAbsoluteCap(Duration? cap) { + _absoluteCap = cap; + if (_started) { + _absoluteTimer?.cancel(); + if (_absoluteCap != null) { + _absoluteTimer = Timer(_absoluteCap!, _fire); + } + } + } + + void start() { + if (_started) return; + _started = true; + _restart(); + if (_absoluteCap != null) { + _absoluteTimer = Timer(_absoluteCap!, _fire); + } + } + + void ping() { + if (!_started) { + start(); + return; + } + _restart(); + } + + void stop() { + _timer?.cancel(); + _timer = null; + _absoluteTimer?.cancel(); + _absoluteTimer = null; + _started = false; + } + + void dispose() => stop(); + + void _restart() { + _timer?.cancel(); + _timer = Timer(_window, _fire); + } + + void _fire() { + stop(); + try { + onTimeout(); + } catch (_) {} + } +} + diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 8cf4f98..664d478 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -13,6 +13,7 @@ import '../../../core/auth/auth_state_manager.dart'; import '../../../core/utils/stream_chunker.dart'; import '../../../core/services/persistent_streaming_service.dart'; import '../../../core/utils/debug_logger.dart'; +import '../../../core/utils/inactivity_watchdog.dart'; import '../services/reviewer_mode_service.dart'; import '../../../shared/services/tasks/task_queue.dart'; import '../../tools/providers/tools_providers.dart'; @@ -40,8 +41,8 @@ class ChatMessagesNotifier extends StateNotifier> { StreamSubscription? _messageStream; ProviderSubscription? _conversationListener; final List _subscriptions = []; - // Inactivity watchdog to prevent stuck typing indicator - Timer? _typingStuckGuard; + // Activity-based watchdog to prevent stuck typing indicator + InactivityWatchdog? _typingWatchdog; ChatMessagesNotifier(this._ref) : super([]) { // Load messages when conversation changes with proper cleanup @@ -115,62 +116,42 @@ class ChatMessagesNotifier extends StateNotifier> { } void _cancelTypingGuard() { - _typingStuckGuard?.cancel(); - _typingStuckGuard = null; + _typingWatchdog?.stop(); + _typingWatchdog = null; } void _scheduleTypingGuard({Duration? timeout}) { // Default timeout tuned to balance long tool gaps and UX final effectiveTimeout = timeout ?? const Duration(seconds: 25); - _typingStuckGuard?.cancel(); - _typingStuckGuard = Timer(effectiveTimeout, () async { - try { - if (state.isEmpty) return; - final last = state.last; - // Still the same streaming message and no finish signal - if (last.role == 'assistant' && last.isStreaming) { - // Attempt a soft recovery: if content is still empty, try fetching final content from server - if ((last.content).trim().isEmpty) { - try { - final apiSvc = _ref.read(apiServiceProvider); - final activeConv = _ref.read(activeConversationProvider); - final msgId = last.id; - final chatId = activeConv?.id; - if (apiSvc != null && chatId != null && chatId.isNotEmpty) { - final resp = await apiSvc.dio.get('/api/v1/chats/' + chatId); - final data = resp.data as Map; - String content = ''; - final chatObj = data['chat'] as Map?; - if (chatObj != null) { - final list = chatObj['messages']; - if (list is List) { - final target = list.firstWhere( - (m) => (m is Map && (m['id']?.toString() == msgId)), - orElse: () => null, - ); - if (target != null) { - final rawContent = (target as Map)['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = (textItem as Map)['text']?.toString() ?? ''; - } - } - } - } - if (content.isEmpty) { - final history = chatObj['history']; - if (history is Map && history['messages'] is Map) { - final Map messagesMap = - (history['messages'] as Map).cast(); - final msg = messagesMap[msgId]; - if (msg is Map) { - final rawContent = msg['content']; + _typingWatchdog ??= InactivityWatchdog( + window: effectiveTimeout, + onTimeout: () async { + try { + if (state.isEmpty) return; + final last = state.last; + // Still the same streaming message and no finish signal + if (last.role == 'assistant' && last.isStreaming) { + // Attempt a soft recovery: if content is still empty, try fetching final content from server + if ((last.content).trim().isEmpty) { + try { + final apiSvc = _ref.read(apiServiceProvider); + final activeConv = _ref.read(activeConversationProvider); + final msgId = last.id; + final chatId = activeConv?.id; + if (apiSvc != null && chatId != null && chatId.isNotEmpty) { + final resp = await apiSvc.dio.get('/api/v1/chats/' + chatId); + final data = resp.data as Map; + String content = ''; + final chatObj = data['chat'] as Map?; + if (chatObj != null) { + final list = chatObj['messages']; + if (list is List) { + final target = list.firstWhere( + (m) => (m is Map && (m['id']?.toString() == msgId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; if (rawContent is String) { content = rawContent; } else if (rawContent is List) { @@ -179,27 +160,52 @@ class ChatMessagesNotifier extends StateNotifier> { orElse: () => null, ); if (textItem != null) { - content = - (textItem as Map)['text']?.toString() ?? ''; + content = (textItem as Map)['text']?.toString() ?? ''; + } + } + } + } + if (content.isEmpty) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map messagesMap = + (history['messages'] as Map) + .cast(); + final msg = messagesMap[msgId]; + if (msg is Map) { + final rawContent = msg['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = + (textItem as Map)['text']?.toString() ?? ''; + } } } } } } + if (content.isNotEmpty) { + replaceLastMessageContent(content); + } } - if (content.isNotEmpty) { - replaceLastMessageContent(content); - } - } - } catch (_) {} + } catch (_) {} + } + // Regardless of fetch result, ensure UI is not stuck + finishStreaming(); } - // Regardless of fetch result, ensure UI is not stuck - finishStreaming(); + } finally { + _cancelTypingGuard(); } - } finally { - _cancelTypingGuard(); - } - }); + }, + ); + _typingWatchdog!.setWindow(effectiveTimeout); + _typingWatchdog!.ping(); } void _touchStreamingActivity() { @@ -222,20 +228,17 @@ class ChatMessagesNotifier extends StateNotifier> { final webSearchAvailable = _ref.read(webSearchAvailableProvider); final globalImageGen = _ref.read(imageGenerationEnabledProvider); + // Extend guard windows to tolerate long reasoning/tools (> 1 min) if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) { - timeout = Duration( - milliseconds: timeout.inMilliseconds.clamp(0, 45000), - ); - // If current < 45s, bump to 45s - if (timeout.inSeconds < 45) timeout = const Duration(seconds: 45); + if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60); } if (isBgFlow) { - // Background tools/dynamic channel can be longer - if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60); + // Background tools/dynamic channel can be much longer + if (timeout.inSeconds < 120) timeout = const Duration(seconds: 120); } if (isImageGenFlow || globalImageGen) { // Image generation tends to be the longest - if (timeout.inSeconds < 90) timeout = const Duration(seconds: 90); + if (timeout.inSeconds < 180) timeout = const Duration(seconds: 180); } } catch (_) {} @@ -762,7 +765,7 @@ Future regenerateMessage( final bool isLastUser = (i == messages.length - 1) && msg.role == 'user'; final List messageAttachments = (isLastUser && (attachments != null && attachments.isNotEmpty)) - ? List.from(attachments!) + ? List.from(attachments) : (msg.attachmentIds ?? const []); if (messageAttachments.isNotEmpty) { @@ -1468,6 +1471,26 @@ Future _sendMessageInternal( } catch (_) {} if (socketService != null) { + // Activity-based watchdog for chat/channel events (resets on activity) + final _chatWatchdog = InactivityWatchdog( + window: const Duration(minutes: 5), + onTimeout: () { + try { + socketService.offChatEvents(); + socketService.offChannelEvents(); + } catch (_) {} + // As a final safeguard, if we're still in streaming state, finish it + try { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && + msgs.last.role == 'assistant' && + msgs.last.isStreaming) { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + } catch (_) {} + }, + )..start(); + void chatHandler(Map ev) { try { final data = ev['data']; @@ -1475,6 +1498,9 @@ Future _sendMessageInternal( final type = data['type']; final payload = data['data']; DebugLogger.stream('Socket chat-events: type=$type'); + // Any chat event indicates activity; reset inactivity watchdog + // (watchdog defined below, near handler registration) + _chatWatchdog.ping(); if (type == 'chat:completion' && payload != null) { if (payload is Map) { // Provider may emit tool_calls at the top level @@ -1591,6 +1617,10 @@ Future _sendMessageInternal( try { socketService.offChatEvents(); } catch (_) {} + try { + _chatWatchdog.ping(); // ensure timer exists + _chatWatchdog.stop(); + } catch (_) {} // Notify server that chat is completed (mirrors web client) try { @@ -1703,6 +1733,9 @@ Future _sendMessageInternal( } // Normal path: finish now ref.read(chatMessagesProvider.notifier).finishStreaming(); + try { + _chatWatchdog.stop(); + } catch (_) {} } } } else if (type == 'request:chat:completion' && payload != null) { @@ -1722,6 +1755,10 @@ Future _sendMessageInternal( try { if (line is String) { final s = line.trim(); + // Dynamic channel activity + try { + _chatWatchdog.ping(); + } catch (_) {} DebugLogger.stream( 'Socket [$channel] line=${s.length > 160 ? '${s.substring(0, 160)}…' : s}', ); @@ -1988,27 +2025,15 @@ Future _sendMessageInternal( .read(chatMessagesProvider.notifier) .appendToLastMessage(content); _updateImagesFromCurrentContent(ref); + _chatWatchdog.ping(); } } } catch (_) {} } socketService.onChannelEvents(channelEventsHandler); - Future.delayed(const Duration(seconds: 90), () { - try { - socketService.offChatEvents(); - socketService.offChannelEvents(); - } catch (_) {} - // As a final safeguard, if we're still in streaming state, finish it to avoid stuck UI - try { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && - msgs.last.role == 'assistant' && - msgs.last.isStreaming) { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - } catch (_) {} - }); + // Start activity watchdog + _chatWatchdog.ping(); } // Prepare streaming and background handling @@ -2063,8 +2088,17 @@ Future _sendMessageInternal( // Helpers were defined above + int _chunkSeq = 0; final streamSubscription = persistentController.stream.listen( (chunk) { + _chunkSeq += 1; + try { + persistentService.updateStreamProgress( + streamId, + chunkSequence: _chunkSeq, + appendedContent: chunk, + ); + } catch (_) {} var effectiveChunk = chunk; // Check for web search indicators in the stream if (webSearchEnabled && !isSearching) { @@ -2960,11 +2994,32 @@ void _attachSocketStreamingHandlers({ final api = ref.read(apiServiceProvider); + // Activity-based watchdog for socket-driven streaming (resets on activity) + final _socketWatchdog = InactivityWatchdog( + window: const Duration(minutes: 5), + onTimeout: () { + try { + socketService.offChatEvents(); + socketService.offChannelEvents(); + } catch (_) {} + try { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && + msgs.last.role == 'assistant' && + msgs.last.isStreaming) { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + } catch (_) {} + }, + )..start(); + void channelLineHandlerFactory(String channel) { void handler(dynamic line) { try { if (line is String) { final s = line.trim(); + // Any socket line is activity + _socketWatchdog.ping(); if (s == '[DONE]' || s == 'DONE') { try { socketService.offEvent(channel); @@ -2982,6 +3037,7 @@ void _attachSocketStreamingHandlers({ ); } catch (_) {} ref.read(chatMessagesProvider.notifier).finishStreaming(); + _socketWatchdog.stop(); return; } if (s.startsWith('data:')) { @@ -3003,6 +3059,7 @@ void _attachSocketStreamingHandlers({ ); } catch (_) {} ref.read(chatMessagesProvider.notifier).finishStreaming(); + _socketWatchdog.stop(); return; } try { @@ -3065,11 +3122,13 @@ void _attachSocketStreamingHandlers({ } } } else if (line is Map) { + _socketWatchdog.ping(); if (line['done'] == true) { try { socketService.offEvent(channel); } catch (_) {} ref.read(chatMessagesProvider.notifier).finishStreaming(); + _socketWatchdog.stop(); return; } } @@ -3077,11 +3136,8 @@ void _attachSocketStreamingHandlers({ } socketService.onEvent(channel, handler); - Future.delayed(const Duration(minutes: 3), () { - try { - socketService.offEvent(channel); - } catch (_) {} - }); + // Start activity watchdog now that handler is attached + _socketWatchdog.ping(); } void chatHandler(Map ev) { @@ -3175,6 +3231,9 @@ void _attachSocketStreamingHandlers({ try { socketService.offChatEvents(); } catch (_) {} + try { + _socketWatchdog.stop(); + } catch (_) {} try { unawaited( api @@ -3326,20 +3385,8 @@ void _attachSocketStreamingHandlers({ socketService.onChatEvents(chatHandler); socketService.onChannelEvents(channelEventsHandler); - Future.delayed(const Duration(seconds: 90), () { - try { - socketService.offChatEvents(); - socketService.offChannelEvents(); - } catch (_) {} - try { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && - msgs.last.role == 'assistant' && - msgs.last.isStreaming) { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - } catch (_) {} - }); + // Start activity watchdog for chat/channel events + _socketWatchdog.ping(); } // ========== Tool Servers (OpenAPI) Helpers ==========