diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index c69cd3d..40b871e 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -38,6 +38,8 @@ class ChatMessagesNotifier extends StateNotifier> { StreamSubscription? _messageStream; ProviderSubscription? _conversationListener; final List _subscriptions = []; + // Inactivity watchdog to prevent stuck typing indicator + Timer? _typingStuckGuard; ChatMessagesNotifier(this._ref) : super([]) { // Load messages when conversation changes with proper cleanup @@ -64,6 +66,8 @@ class ChatMessagesNotifier extends StateNotifier> { // Cancel any existing message stream when switching conversations _cancelMessageStream(); + // Also cancel typing guard on conversation switch + _cancelTypingGuard(); if (next != null) { state = next.messages; @@ -87,6 +91,132 @@ class ChatMessagesNotifier extends StateNotifier> { _messageStream = null; } + void _cancelTypingGuard() { + _typingStuckGuard?.cancel(); + _typingStuckGuard = null; + } + + void _scheduleTypingGuard({Duration? timeout}) { + // Default timeout tuned to balance long tool gaps and UX + final effectiveTimeout = timeout ?? const Duration(seconds: 25); + _typingStuckGuard?.cancel(); + _typingStuckGuard = Timer(effectiveTimeout, () async { + try { + if (state.isEmpty) return; + final last = state.last; + // Still the same streaming message and no finish signal + if (last.role == 'assistant' && last.isStreaming) { + // Attempt a soft recovery: if content is still empty, try fetching final content from server + if ((last.content).trim().isEmpty) { + try { + final apiSvc = _ref.read(apiServiceProvider); + final activeConv = _ref.read(activeConversationProvider); + final msgId = last.id; + final chatId = activeConv?.id; + if (apiSvc != null && chatId != null && chatId.isNotEmpty) { + final resp = await apiSvc.dio.get('/api/v1/chats/' + chatId); + final data = resp.data as Map; + String content = ''; + final chatObj = data['chat'] as Map?; + if (chatObj != null) { + final list = chatObj['messages']; + if (list is List) { + final target = list.firstWhere( + (m) => (m is Map && (m['id']?.toString() == msgId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = (textItem as Map)['text']?.toString() ?? ''; + } + } + } + } + if (content.isEmpty) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map messagesMap = + (history['messages'] as Map).cast(); + final msg = messagesMap[msgId]; + if (msg is Map) { + final rawContent = msg['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = (textItem as Map)['text']?.toString() ?? ''; + } + } + } + } + } + } + if (content.isNotEmpty) { + replaceLastMessageContent(content); + } + } + } catch (_) {} + } + // Regardless of fetch result, ensure UI is not stuck + finishStreaming(); + } + } finally { + _cancelTypingGuard(); + } + }); + } + + void _touchStreamingActivity() { + // Keep guard alive while streaming + if (state.isNotEmpty) { + final last = state.last; + if (last.role == 'assistant' && last.isStreaming) { + // Compute a dynamic timeout based on flow type + Duration timeout = const Duration(seconds: 25); + try { + final meta = last.metadata ?? const {}; + final isBgFlow = (meta['backgroundFlow'] == true); + final isWebSearchFlow = + (meta['webSearchFlow'] == true) || (meta['webSearchActive'] == true); + final isImageGenFlow = (meta['imageGenerationFlow'] == true); + + // Also consult global toggles if metadata not present + final globalWebSearch = _ref.read(webSearchEnabledProvider); + final webSearchAvailable = _ref.read(webSearchAvailableProvider); + final globalImageGen = _ref.read(imageGenerationEnabledProvider); + + if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) { + timeout = Duration(milliseconds: timeout.inMilliseconds.clamp(0, 45000)); + // If current < 45s, bump to 45s + if (timeout.inSeconds < 45) timeout = const Duration(seconds: 45); + } + if (isBgFlow) { + // Background tools/dynamic channel can be longer + if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60); + } + if (isImageGenFlow || globalImageGen) { + // Image generation tends to be the longest + if (timeout.inSeconds < 90) timeout = const Duration(seconds: 90); + } + } catch (_) {} + + _scheduleTypingGuard(timeout: timeout); + } + } + } + // Public wrapper to cancel the currently active stream (used by Stop) void cancelActiveMessageStream() { _cancelMessageStream(); @@ -137,6 +267,9 @@ class ChatMessagesNotifier extends StateNotifier> { void addMessage(ChatMessage message) { state = [...state, message]; + if (message.role == 'assistant' && message.isStreaming) { + _touchStreamingActivity(); + } } void removeLastMessage() { @@ -176,6 +309,7 @@ class ChatMessagesNotifier extends StateNotifier> { ...state.sublist(0, state.length - 1), lastMessage.copyWith(content: sanitized(content)), ]; + _touchStreamingActivity(); } void updateLastMessageWithFunction( @@ -185,8 +319,11 @@ class ChatMessagesNotifier extends StateNotifier> { final lastMessage = state.last; if (lastMessage.role != 'assistant') return; - - state = [...state.sublist(0, state.length - 1), updater(lastMessage)]; + final updated = updater(lastMessage); + state = [...state.sublist(0, state.length - 1), updated]; + if (updated.isStreaming) { + _touchStreamingActivity(); + } } void appendToLastMessage(String content) { @@ -219,6 +356,7 @@ class ChatMessagesNotifier extends StateNotifier> { ...state.sublist(0, state.length - 1), lastMessage.copyWith(content: newContent), ]; + _touchStreamingActivity(); } void replaceLastMessageContent(String content) { @@ -245,6 +383,7 @@ class ChatMessagesNotifier extends StateNotifier> { ...state.sublist(0, state.length - 1), lastMessage.copyWith(content: sanitized), ]; + _touchStreamingActivity(); } void finishStreaming() { @@ -268,6 +407,7 @@ class ChatMessagesNotifier extends StateNotifier> { ...state.sublist(0, state.length - 1), lastMessage.copyWith(isStreaming: false, content: cleaned), ]; + _cancelTypingGuard(); } @override @@ -284,6 +424,8 @@ class ChatMessagesNotifier extends StateNotifier> { // Cancel message stream specifically _cancelMessageStream(); + // Cancel any active typing guard + _cancelTypingGuard(); // Cancel conversation listener specifically _conversationListener?.close(); @@ -1220,6 +1362,19 @@ Future _sendMessageInternal( isBackgroundToolsFlowPre || isBackgroundWebSearchPre || wantSessionBinding; bool suppressSocketContent = !isBackgroundFlow; // allow socket text when session-bound or tools bool usingDynamicChannel = false; // set true when server provides a channel + // Enrich the assistant placeholder metadata so the typing guard can use longer timeouts + try { + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((m) { + final mergedMeta = { + if (m.metadata != null) ...m.metadata!, + 'backgroundFlow': isBackgroundFlow, + if (isBackgroundWebSearchPre) 'webSearchFlow': true, + if (imageGenerationEnabled) 'imageGenerationFlow': true, + }; + return m.copyWith(metadata: mergedMeta); + }); + } catch (_) {} + if (socketService != null) { void chatHandler(Map ev) { try { @@ -1664,6 +1819,15 @@ Future _sendMessageInternal( socketService.offChatEvents(); socketService.offChannelEvents(); } catch (_) {} + // As a final safeguard, if we're still in streaming state, finish it to avoid stuck UI + try { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && + msgs.last.role == 'assistant' && + msgs.last.isStreaming) { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + } catch (_) {} }); }