diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index a53226f..9583bb0 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -6,7 +6,6 @@ import 'package:flutter/material.dart'; import '../../core/models/chat_message.dart'; import '../../core/models/socket_event.dart'; import '../../core/services/socket_service.dart'; -import '../../core/utils/inactivity_watchdog.dart'; import '../../core/utils/tool_calls_parser.dart'; import 'navigation_service.dart'; import 'conversation_delta_listener.dart'; @@ -252,7 +251,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ onError: persistentController.addError, ); - InactivityWatchdog? socketWatchdog; // Socket subscriptions list - starts empty so non-socket flows can finish via onComplete. // HTTP subscription is tracked separately and cleaned up in disposeSocketSubscriptions. final socketSubscriptions = []; @@ -364,75 +362,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } if (hasSocketSignals) { - // Adaptive inactivity timeout based on model capabilities. - // Reasoning models and tool-enabled flows need longer windows as they may - // have longer gaps between tokens during processing. - final watchdogWindow = modelUsesReasoning || toolsEnabled - ? const Duration(seconds: 30) // Longer for reasoning/tools - : const Duration(seconds: 15); // Standard for regular models - - final watchdogCap = modelUsesReasoning || toolsEnabled - ? const Duration(minutes: 10) // Longer cap for complex operations - : const Duration(minutes: 5); - - DebugLogger.log( - 'Initializing watchdog', - scope: 'streaming/helper', - data: { - 'windowSeconds': watchdogWindow.inSeconds, - 'capMinutes': watchdogCap.inMinutes, - 'modelUsesReasoning': modelUsesReasoning, - 'toolsEnabled': toolsEnabled, - }, - ); - - // Inactivity timeout - if no data arrives within window, poll server - // and finish streaming. This handles stuck connections (issue #172). - socketWatchdog = InactivityWatchdog( - window: watchdogWindow, - absoluteCap: watchdogCap, - onTimeout: () async { - DebugLogger.log( - 'Socket watchdog timeout - polling server', - scope: 'streaming/helper', - ); - - final result = await pollServerForMessage(); - if (result != null) { - final applied = applyServerContent( - result.content, - result.followUps, - finishIfDone: false, // We always finish on timeout - isDone: result.isDone, - source: 'Watchdog', - ); - if (applied) { - syncImages(); - } - } - - // Clean up and finish - try { - for (final dispose in socketSubscriptions) { - try { - dispose(); - } catch (_) {} - } - socketSubscriptions.clear(); - } catch (_) {} - - try { - final msgs = getMessages(); - if (msgs.isNotEmpty && - msgs.last.role == 'assistant' && - msgs.last.isStreaming) { - wrappedFinishStreaming(); - } - } catch (_) {} - socketWatchdog?.stop(); - }, - )..start(); - // Handle socket reconnection - update session IDs and check for missed events if (socketService != null) { StreamSubscription? reconnectSub; @@ -502,7 +431,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ pendingImageSignature = null; lastProcessedImageSignature = null; imageCollectionRequestId = 0; - socketWatchdog?.stop(); } bool isSearching = false; @@ -668,7 +596,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { if (line is String) { final s = line.trim(); - socketWatchdog?.ping(); // Enhanced completion detection matching OpenWebUI patterns if (s == '[DONE]' || s == 'DONE' || s == 'data: [DONE]') { try { @@ -687,7 +614,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ ); } catch (_) {} wrappedFinishStreaming(); - socketWatchdog?.stop(); return; } if (s.startsWith('data:')) { @@ -708,7 +634,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ ); } catch (_) {} wrappedFinishStreaming(); - socketWatchdog?.stop(); return; } try { @@ -763,13 +688,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } } else if (line is Map) { - socketWatchdog?.ping(); if (line['done'] == true) { try { socketService?.offEvent(channel); } catch (_) {} wrappedFinishStreaming(); - socketWatchdog?.stop(); return; } } @@ -779,7 +702,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { socketService?.onEvent(channel, handler); } catch (_) {} - socketWatchdog?.ping(); // Increased timeout to match our more generous streaming timeouts // OpenWebUI doesn't have such aggressive channel timeouts // Use Timer instead of Future.delayed so it can be cancelled on cleanup @@ -787,9 +709,8 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { socketService?.offEvent(channel); } catch (_) {} - socketWatchdog?.stop(); }); - // Register cleanup so the timer is cancelled if streaming completes early + // Register cleanup for socket subscriptions socketSubscriptions.add(() { channelTimeoutTimer.cancel(); }); @@ -815,7 +736,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } final payload = data['data']; final messageId = ev['message_id']?.toString(); - socketWatchdog?.ping(); if (kSocketVerboseLogging && payload is Map) { DebugLogger.log( @@ -998,7 +918,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } wrappedFinishStreaming(); - socketWatchdog?.stop(); } } } else if (type == 'status' && payload != null) { @@ -1177,7 +1096,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ }); // Ensure UI exits streaming state wrappedFinishStreaming(); - socketWatchdog?.stop(); } else if ((type == 'chat:message:delta' || type == 'message') && payload != null) { // Incremental message content over socket @@ -1366,7 +1284,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ requireFocus: false, ), onDelta: (event) { - socketWatchdog?.ping(); chatHandler(event.raw, event.ack); }, onError: (error, stackTrace) { @@ -1396,7 +1313,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ requireFocus: false, ), onDelta: (event) { - socketWatchdog?.ping(); channelEventsHandler(event.raw, event.ack); }, onError: (error, stackTrace) { @@ -1511,13 +1427,9 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ if (connected) { DebugLogger.log( - 'Socket recovery successful - restarting watchdog', + 'Socket recovery successful', scope: 'streaming/helper', ); - // Restart watchdog instead of stopping it - this ensures we - // still have a timeout mechanism if socket recovery succeeds - // but events don't resume (fixes premature watchdog stop bug) - socketWatchdog?.ping(); return; } } catch (e) { @@ -1531,14 +1443,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ disposeSocketSubscriptions(); wrappedFinishStreaming(); Future.microtask(refreshConversationSnapshot); - socketWatchdog?.stop(); }, ); return ActiveSocketStream( controller: controller, socketSubscriptions: socketSubscriptions, - disposeWatchdog: () => socketWatchdog?.stop(), + disposeWatchdog: () {}, ); } diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index c123b15..7719b27 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -109,10 +109,6 @@ class ChatMessagesNotifier extends Notifier> { bool _taskStatusCheckInFlight = false; bool _observedRemoteTask = false; - /// Counts consecutive polls where no tasks were observed. - /// Used to trigger fallback server check if task registration was missed. - int _noTaskPollCount = 0; - MarkdownStreamFormatter? _markdownFormatter; String? _activeStreamingMessageId; @@ -301,7 +297,6 @@ class ChatMessagesNotifier extends Notifier> { _taskStatusTimer = null; _taskStatusCheckInFlight = false; _observedRemoteTask = false; - _noTaskPollCount = 0; } Future _syncRemoteTaskStatus() async { @@ -328,26 +323,20 @@ class ChatMessagesNotifier extends Notifier> { if (hasActiveTasks) { _observedRemoteTask = true; - _noTaskPollCount = 0; - } else { - _noTaskPollCount++; } // When no active tasks and we previously observed tasks, streaming should be done. final tasksDone = _observedRemoteTask && !hasActiveTasks; - // Fallback: If we've polled exactly 3 times without ever seeing tasks, - // something may be wrong - check server state directly. This catches cases - // where task registration was completely missed due to socket issues. - // Using == 3 instead of >= 3 ensures this only triggers once, not every poll. - final fallbackCheck = !_observedRemoteTask && _noTaskPollCount == 3; - - // Secondary check: fetch conversation from server and compare message state - // This catches cases where the done signal was missed AND syncs any missed content. - // Runs when: - // 1. Tasks completed (were observed and are now gone), OR - // 2. Fallback: No tasks ever observed after exactly 3 polls (one-time check) - if (_hasStreamingAssistant && (tasksDone || fallbackCheck)) { + // Secondary check: fetch conversation from server and compare message state. + // This catches cases where the done signal was missed AND syncs any missed + // content. Only runs when tasks have genuinely completed (were observed and + // are now gone). We intentionally avoid any timed fallback checks here + // because they conflict with legitimate slow task registration scenarios + // like web search, which can take a long time to start on the server. + // Note: If a socket connection silently fails before tasks complete, the + // user can cancel via the stop button or navigate away to recover. + if (_hasStreamingAssistant && tasksDone) { try { final serverConversation = await api.getConversation( activeConversation.id,