diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 9624e49..cca739f 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -15,7 +15,6 @@ import '../../../core/services/conversation_delta_listener.dart'; import '../../../core/services/streaming_helper.dart'; import '../../../core/services/streaming_response_controller.dart'; import '../../../core/utils/debug_logger.dart'; -import '../../../core/utils/inactivity_watchdog.dart'; import '../../../core/utils/markdown_stream_formatter.dart'; import '../../../core/utils/tool_calls_parser.dart'; import '../../../shared/services/tasks/task_queue.dart'; @@ -83,9 +82,10 @@ class ChatMessagesNotifier extends Notifier> { final List _subscriptions = []; final List _socketSubscriptions = []; VoidCallback? _socketTeardown; - // Activity-based watchdog to prevent stuck typing indicator - InactivityWatchdog? _typingWatchdog; DateTime? _lastStreamingActivity; + Timer? _taskStatusTimer; + bool _taskStatusCheckInFlight = false; + bool _observedRemoteTask = false; MarkdownStreamFormatter? _markdownFormatter; String? _activeStreamingMessageId; @@ -143,16 +143,20 @@ class ChatMessagesNotifier extends Notifier> { // Cancel any existing message stream when switching conversations _cancelMessageStream(); - // Also cancel typing guard on conversation switch - _cancelTypingGuard(); + _stopRemoteTaskMonitor(); if (next != null) { state = next.messages; // Update selected model if conversation has a different model _updateModelForConversation(next); + + if (_hasStreamingAssistant) { + _ensureRemoteTaskMonitor(); + } } else { state = []; + _stopRemoteTaskMonitor(); } }); @@ -163,8 +167,7 @@ class ChatMessagesNotifier extends Notifier> { _subscriptions.clear(); _cancelMessageStream(); - cancelSocketSubscriptions(); - _cancelTypingGuard(); + _stopRemoteTaskMonitor(); _conversationListener?.close(); _conversationListener = null; @@ -183,11 +186,7 @@ class ChatMessagesNotifier extends Notifier> { } _clearStreamingFormatter(); cancelSocketSubscriptions(); - } - - void _cancelTypingGuard() { - _typingWatchdog?.stop(); - _typingWatchdog = null; + _stopRemoteTaskMonitor(); } void _clearStreamingFormatter() { @@ -195,6 +194,69 @@ class ChatMessagesNotifier extends Notifier> { _activeStreamingMessageId = null; } + bool get _hasStreamingAssistant { + if (state.isEmpty) return false; + final last = state.last; + return last.role == 'assistant' && last.isStreaming; + } + + void _ensureRemoteTaskMonitor() { + if (_taskStatusTimer != null) { + return; + } + _taskStatusTimer = Timer.periodic(const Duration(seconds: 5), (_) { + if (!_taskStatusCheckInFlight) { + unawaited(_syncRemoteTaskStatus()); + } + }); + if (!_taskStatusCheckInFlight) { + unawaited(_syncRemoteTaskStatus()); + } + } + + void _stopRemoteTaskMonitor() { + _taskStatusTimer?.cancel(); + _taskStatusTimer = null; + _taskStatusCheckInFlight = false; + _observedRemoteTask = false; + } + + Future _syncRemoteTaskStatus() async { + if (_taskStatusCheckInFlight) { + return; + } + if (!_hasStreamingAssistant) { + _stopRemoteTaskMonitor(); + return; + } + + final api = ref.read(apiServiceProvider); + final activeConversation = ref.read(activeConversationProvider); + if (api == null || activeConversation == null) { + _stopRemoteTaskMonitor(); + return; + } + + _taskStatusCheckInFlight = true; + try { + final taskIds = await api.getTaskIdsByChat(activeConversation.id); + if (taskIds.isEmpty) { + if (_observedRemoteTask && _hasStreamingAssistant) { + finishStreaming(); + } else if (!_observedRemoteTask) { + // No tasks reported yet; keep monitoring to allow registration. + } + } else { + _observedRemoteTask = true; + } + } catch (err, stack) { + DebugLogger.log('Task status poll failed: $err', scope: 'chat/provider'); + debugPrintStack(stackTrace: stack); + } finally { + _taskStatusCheckInFlight = false; + } + } + void _ensureFormatterForMessage(ChatMessage message) { if (_markdownFormatter != null && _activeStreamingMessageId == message.id) { return; @@ -231,132 +293,16 @@ class ChatMessagesNotifier extends Notifier> { return fallback; } - void _scheduleTypingGuard({Duration? timeout}) { - // Default timeout tuned to balance long tool gaps and UX - final effectiveTimeout = timeout ?? const Duration(seconds: 25); - _typingWatchdog ??= InactivityWatchdog( - window: effectiveTimeout, - onTimeout: () async { - try { - if (state.isEmpty) return; - final last = state.last; - // Still the same streaming message and no finish signal - if (last.role == 'assistant' && last.isStreaming) { - // Attempt a soft recovery: if content is still empty, try fetching final content from server - if ((last.content).trim().isEmpty) { - try { - final apiSvc = ref.read(apiServiceProvider); - final activeConv = ref.read(activeConversationProvider); - final msgId = last.id; - final chatId = activeConv?.id; - if (apiSvc != null && chatId != null && chatId.isNotEmpty) { - final resp = await apiSvc.dio.get('/api/v1/chats/$chatId'); - final data = resp.data as Map; - String content = ''; - final chatObj = data['chat'] as Map?; - if (chatObj != null) { - final list = chatObj['messages']; - if (list is List) { - final target = list.firstWhere( - (m) => (m is Map && (m['id']?.toString() == msgId)), - orElse: () => null, - ); - if (target != null) { - final rawContent = (target as Map)['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = - (textItem as Map)['text']?.toString() ?? ''; - } - } - } - } - if (content.isEmpty) { - final history = chatObj['history']; - if (history is Map && history['messages'] is Map) { - final Map messagesMap = - (history['messages'] as Map) - .cast(); - final msg = messagesMap[msgId]; - if (msg is Map) { - final rawContent = msg['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = - (textItem as Map)['text']?.toString() ?? ''; - } - } - } - } - } - } - if (content.isNotEmpty) { - replaceLastMessageContent(content); - } - } - } catch (_) {} - } - // Regardless of fetch result, ensure UI is not stuck - finishStreaming(); - } - } finally { - _cancelTypingGuard(); - } - }, - ); - _typingWatchdog!.setWindow(effectiveTimeout); - _typingWatchdog!.ping(); - } - void _touchStreamingActivity() { _lastStreamingActivity = DateTime.now(); - // Keep guard alive while streaming - if (state.isNotEmpty) { - final last = state.last; - if (last.role == 'assistant' && last.isStreaming) { - // Compute a dynamic timeout based on flow type - Duration timeout = const Duration(seconds: 25); - try { - final meta = last.metadata ?? const {}; - final isBgFlow = (meta['backgroundFlow'] == true); - final isWebSearchFlow = - (meta['webSearchFlow'] == true) || - (meta['webSearchActive'] == true); - final isImageGenFlow = (meta['imageGenerationFlow'] == true); - - // Also consult global toggles if metadata not present - final globalWebSearch = ref.read(webSearchEnabledProvider); - final webSearchAvailable = ref.read(webSearchAvailableProvider); - final globalImageGen = ref.read(imageGenerationEnabledProvider); - - // Extend guard windows to tolerate long reasoning/tools (> 1 min) - if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) { - if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60); - } - if (isBgFlow) { - // Background tools/dynamic channel can be much longer - if (timeout.inSeconds < 120) timeout = const Duration(seconds: 120); - } - if (isImageGenFlow || globalImageGen) { - // Image generation tends to be the longest - if (timeout.inSeconds < 180) timeout = const Duration(seconds: 180); - } - } catch (_) {} - - _scheduleTypingGuard(timeout: timeout); + if (_hasStreamingAssistant) { + // Reset observed flag each time a new streaming session starts. + if (_taskStatusTimer == null) { + _observedRemoteTask = false; } + _ensureRemoteTaskMonitor(); + } else { + _stopRemoteTaskMonitor(); } } @@ -641,7 +587,7 @@ class ChatMessagesNotifier extends Notifier> { lastMessage.copyWith(isStreaming: false, content: cleaned), ]; _messageStream = null; - _cancelTypingGuard(); + _stopRemoteTaskMonitor(); // Trigger a refresh of the conversations list so UI like the Chats Drawer // can pick up updated titles and ordering once streaming completes.