diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index c22f6af..17c411e 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -204,12 +204,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ final hasSocketSignals = socketService != null || registerDeltaListener != null; if (hasSocketSignals) { - // Use a reasonable inactivity timeout - if no data arrives for 45 seconds, - // something is likely wrong with the connection + // Use a short inactivity timeout - if no data arrives for 10 seconds, + // something is likely wrong with the connection. Combined with 1-second + // polling and server state sync, this provides fast recovery. socketWatchdog = InactivityWatchdog( - window: const Duration(seconds: 45), + window: const Duration(seconds: 10), // Absolute cap ensures streaming never gets stuck indefinitely - absoluteCap: const Duration(minutes: 10), + absoluteCap: const Duration(minutes: 5), onTimeout: () { DebugLogger.log( 'Socket watchdog timeout - finishing streaming gracefully', diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index e30e447..5c93338 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -109,6 +109,10 @@ class ChatMessagesNotifier extends Notifier> { bool _taskStatusCheckInFlight = false; bool _observedRemoteTask = false; + /// Counts consecutive polls where no tasks were observed. + /// Used to trigger fallback server check if task registration was missed. + int _noTaskPollCount = 0; + MarkdownStreamFormatter? _markdownFormatter; String? _activeStreamingMessageId; @@ -280,7 +284,9 @@ class ChatMessagesNotifier extends Notifier> { if (_taskStatusTimer != null) { return; } - _taskStatusTimer = Timer.periodic(const Duration(seconds: 5), (_) { + // Poll every second for fast recovery from missed socket events. + // This is a lightweight API call and provides the best UX for stuck streaming. + _taskStatusTimer = Timer.periodic(const Duration(seconds: 1), (_) { if (!_taskStatusCheckInFlight) { unawaited(_syncRemoteTaskStatus()); } @@ -295,6 +301,7 @@ class ChatMessagesNotifier extends Notifier> { _taskStatusTimer = null; _taskStatusCheckInFlight = false; _observedRemoteTask = false; + _noTaskPollCount = 0; } Future _syncRemoteTaskStatus() async { @@ -321,17 +328,26 @@ class ChatMessagesNotifier extends Notifier> { if (hasActiveTasks) { _observedRemoteTask = true; + _noTaskPollCount = 0; + } else { + _noTaskPollCount++; } // When no active tasks and we previously observed tasks, streaming should be done. - // Run secondary check to sync any missed content from server. final tasksDone = _observedRemoteTask && !hasActiveTasks; + // Fallback: If we've polled exactly 3 times without ever seeing tasks, + // something may be wrong - check server state directly. This catches cases + // where task registration was completely missed due to socket issues. + // Using == 3 instead of >= 3 ensures this only triggers once, not every poll. + final fallbackCheck = !_observedRemoteTask && _noTaskPollCount == 3; + // Secondary check: fetch conversation from server and compare message state // This catches cases where the done signal was missed AND syncs any missed content. - // Only run when tasks have actually completed (were observed and are now gone), - // not on every poll before tasks are registered. - if (_hasStreamingAssistant && tasksDone) { + // Runs when: + // 1. Tasks completed (were observed and are now gone), OR + // 2. Fallback: No tasks ever observed after exactly 3 polls (one-time check) + if (_hasStreamingAssistant && (tasksDone || fallbackCheck)) { try { final serverConversation = await api.getConversation( activeConversation.id,