refactor(streaming): Remove inactivity watchdog and related code

This commit is contained in:
cogwheel0
2025-12-07 10:32:33 +05:30
parent b271d1987b
commit 324217252e
2 changed files with 12 additions and 112 deletions

View File

@@ -6,7 +6,6 @@ import 'package:flutter/material.dart';
import '../../core/models/chat_message.dart'; import '../../core/models/chat_message.dart';
import '../../core/models/socket_event.dart'; import '../../core/models/socket_event.dart';
import '../../core/services/socket_service.dart'; import '../../core/services/socket_service.dart';
import '../../core/utils/inactivity_watchdog.dart';
import '../../core/utils/tool_calls_parser.dart'; import '../../core/utils/tool_calls_parser.dart';
import 'navigation_service.dart'; import 'navigation_service.dart';
import 'conversation_delta_listener.dart'; import 'conversation_delta_listener.dart';
@@ -252,7 +251,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
onError: persistentController.addError, onError: persistentController.addError,
); );
InactivityWatchdog? socketWatchdog;
// Socket subscriptions list - starts empty so non-socket flows can finish via onComplete. // Socket subscriptions list - starts empty so non-socket flows can finish via onComplete.
// HTTP subscription is tracked separately and cleaned up in disposeSocketSubscriptions. // HTTP subscription is tracked separately and cleaned up in disposeSocketSubscriptions.
final socketSubscriptions = <VoidCallback>[]; final socketSubscriptions = <VoidCallback>[];
@@ -364,75 +362,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
if (hasSocketSignals) { if (hasSocketSignals) {
// Adaptive inactivity timeout based on model capabilities.
// Reasoning models and tool-enabled flows need longer windows as they may
// have longer gaps between tokens during processing.
final watchdogWindow = modelUsesReasoning || toolsEnabled
? const Duration(seconds: 30) // Longer for reasoning/tools
: const Duration(seconds: 15); // Standard for regular models
final watchdogCap = modelUsesReasoning || toolsEnabled
? const Duration(minutes: 10) // Longer cap for complex operations
: const Duration(minutes: 5);
DebugLogger.log(
'Initializing watchdog',
scope: 'streaming/helper',
data: {
'windowSeconds': watchdogWindow.inSeconds,
'capMinutes': watchdogCap.inMinutes,
'modelUsesReasoning': modelUsesReasoning,
'toolsEnabled': toolsEnabled,
},
);
// Inactivity timeout - if no data arrives within window, poll server
// and finish streaming. This handles stuck connections (issue #172).
socketWatchdog = InactivityWatchdog(
window: watchdogWindow,
absoluteCap: watchdogCap,
onTimeout: () async {
DebugLogger.log(
'Socket watchdog timeout - polling server',
scope: 'streaming/helper',
);
final result = await pollServerForMessage();
if (result != null) {
final applied = applyServerContent(
result.content,
result.followUps,
finishIfDone: false, // We always finish on timeout
isDone: result.isDone,
source: 'Watchdog',
);
if (applied) {
syncImages();
}
}
// Clean up and finish
try {
for (final dispose in socketSubscriptions) {
try {
dispose();
} catch (_) {}
}
socketSubscriptions.clear();
} catch (_) {}
try {
final msgs = getMessages();
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
wrappedFinishStreaming();
}
} catch (_) {}
socketWatchdog?.stop();
},
)..start();
// Handle socket reconnection - update session IDs and check for missed events // Handle socket reconnection - update session IDs and check for missed events
if (socketService != null) { if (socketService != null) {
StreamSubscription<void>? reconnectSub; StreamSubscription<void>? reconnectSub;
@@ -502,7 +431,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
pendingImageSignature = null; pendingImageSignature = null;
lastProcessedImageSignature = null; lastProcessedImageSignature = null;
imageCollectionRequestId = 0; imageCollectionRequestId = 0;
socketWatchdog?.stop();
} }
bool isSearching = false; bool isSearching = false;
@@ -668,7 +596,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
try { try {
if (line is String) { if (line is String) {
final s = line.trim(); final s = line.trim();
socketWatchdog?.ping();
// Enhanced completion detection matching OpenWebUI patterns // Enhanced completion detection matching OpenWebUI patterns
if (s == '[DONE]' || s == 'DONE' || s == 'data: [DONE]') { if (s == '[DONE]' || s == 'DONE' || s == 'data: [DONE]') {
try { try {
@@ -687,7 +614,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
); );
} catch (_) {} } catch (_) {}
wrappedFinishStreaming(); wrappedFinishStreaming();
socketWatchdog?.stop();
return; return;
} }
if (s.startsWith('data:')) { if (s.startsWith('data:')) {
@@ -708,7 +634,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
); );
} catch (_) {} } catch (_) {}
wrappedFinishStreaming(); wrappedFinishStreaming();
socketWatchdog?.stop();
return; return;
} }
try { try {
@@ -763,13 +688,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
} }
} else if (line is Map) { } else if (line is Map) {
socketWatchdog?.ping();
if (line['done'] == true) { if (line['done'] == true) {
try { try {
socketService?.offEvent(channel); socketService?.offEvent(channel);
} catch (_) {} } catch (_) {}
wrappedFinishStreaming(); wrappedFinishStreaming();
socketWatchdog?.stop();
return; return;
} }
} }
@@ -779,7 +702,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
try { try {
socketService?.onEvent(channel, handler); socketService?.onEvent(channel, handler);
} catch (_) {} } catch (_) {}
socketWatchdog?.ping();
// Increased timeout to match our more generous streaming timeouts // Increased timeout to match our more generous streaming timeouts
// OpenWebUI doesn't have such aggressive channel timeouts // OpenWebUI doesn't have such aggressive channel timeouts
// Use Timer instead of Future.delayed so it can be cancelled on cleanup // Use Timer instead of Future.delayed so it can be cancelled on cleanup
@@ -787,9 +709,8 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
try { try {
socketService?.offEvent(channel); socketService?.offEvent(channel);
} catch (_) {} } catch (_) {}
socketWatchdog?.stop();
}); });
// Register cleanup so the timer is cancelled if streaming completes early // Register cleanup for socket subscriptions
socketSubscriptions.add(() { socketSubscriptions.add(() {
channelTimeoutTimer.cancel(); channelTimeoutTimer.cancel();
}); });
@@ -815,7 +736,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
final payload = data['data']; final payload = data['data'];
final messageId = ev['message_id']?.toString(); final messageId = ev['message_id']?.toString();
socketWatchdog?.ping();
if (kSocketVerboseLogging && payload is Map) { if (kSocketVerboseLogging && payload is Map) {
DebugLogger.log( DebugLogger.log(
@@ -998,7 +918,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
} }
wrappedFinishStreaming(); wrappedFinishStreaming();
socketWatchdog?.stop();
} }
} }
} else if (type == 'status' && payload != null) { } else if (type == 'status' && payload != null) {
@@ -1177,7 +1096,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}); });
// Ensure UI exits streaming state // Ensure UI exits streaming state
wrappedFinishStreaming(); wrappedFinishStreaming();
socketWatchdog?.stop();
} else if ((type == 'chat:message:delta' || type == 'message') && } else if ((type == 'chat:message:delta' || type == 'message') &&
payload != null) { payload != null) {
// Incremental message content over socket // Incremental message content over socket
@@ -1366,7 +1284,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
requireFocus: false, requireFocus: false,
), ),
onDelta: (event) { onDelta: (event) {
socketWatchdog?.ping();
chatHandler(event.raw, event.ack); chatHandler(event.raw, event.ack);
}, },
onError: (error, stackTrace) { onError: (error, stackTrace) {
@@ -1396,7 +1313,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
requireFocus: false, requireFocus: false,
), ),
onDelta: (event) { onDelta: (event) {
socketWatchdog?.ping();
channelEventsHandler(event.raw, event.ack); channelEventsHandler(event.raw, event.ack);
}, },
onError: (error, stackTrace) { onError: (error, stackTrace) {
@@ -1511,13 +1427,9 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
if (connected) { if (connected) {
DebugLogger.log( DebugLogger.log(
'Socket recovery successful - restarting watchdog', 'Socket recovery successful',
scope: 'streaming/helper', scope: 'streaming/helper',
); );
// Restart watchdog instead of stopping it - this ensures we
// still have a timeout mechanism if socket recovery succeeds
// but events don't resume (fixes premature watchdog stop bug)
socketWatchdog?.ping();
return; return;
} }
} catch (e) { } catch (e) {
@@ -1531,14 +1443,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
disposeSocketSubscriptions(); disposeSocketSubscriptions();
wrappedFinishStreaming(); wrappedFinishStreaming();
Future.microtask(refreshConversationSnapshot); Future.microtask(refreshConversationSnapshot);
socketWatchdog?.stop();
}, },
); );
return ActiveSocketStream( return ActiveSocketStream(
controller: controller, controller: controller,
socketSubscriptions: socketSubscriptions, socketSubscriptions: socketSubscriptions,
disposeWatchdog: () => socketWatchdog?.stop(), disposeWatchdog: () {},
); );
} }

View File

@@ -109,10 +109,6 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
bool _taskStatusCheckInFlight = false; bool _taskStatusCheckInFlight = false;
bool _observedRemoteTask = false; bool _observedRemoteTask = false;
/// Counts consecutive polls where no tasks were observed.
/// Used to trigger fallback server check if task registration was missed.
int _noTaskPollCount = 0;
MarkdownStreamFormatter? _markdownFormatter; MarkdownStreamFormatter? _markdownFormatter;
String? _activeStreamingMessageId; String? _activeStreamingMessageId;
@@ -301,7 +297,6 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_taskStatusTimer = null; _taskStatusTimer = null;
_taskStatusCheckInFlight = false; _taskStatusCheckInFlight = false;
_observedRemoteTask = false; _observedRemoteTask = false;
_noTaskPollCount = 0;
} }
Future<void> _syncRemoteTaskStatus() async { Future<void> _syncRemoteTaskStatus() async {
@@ -328,26 +323,20 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
if (hasActiveTasks) { if (hasActiveTasks) {
_observedRemoteTask = true; _observedRemoteTask = true;
_noTaskPollCount = 0;
} else {
_noTaskPollCount++;
} }
// When no active tasks and we previously observed tasks, streaming should be done. // When no active tasks and we previously observed tasks, streaming should be done.
final tasksDone = _observedRemoteTask && !hasActiveTasks; final tasksDone = _observedRemoteTask && !hasActiveTasks;
// Fallback: If we've polled exactly 3 times without ever seeing tasks, // Secondary check: fetch conversation from server and compare message state.
// something may be wrong - check server state directly. This catches cases // This catches cases where the done signal was missed AND syncs any missed
// where task registration was completely missed due to socket issues. // content. Only runs when tasks have genuinely completed (were observed and
// Using == 3 instead of >= 3 ensures this only triggers once, not every poll. // are now gone). We intentionally avoid any timed fallback checks here
final fallbackCheck = !_observedRemoteTask && _noTaskPollCount == 3; // because they conflict with legitimate slow task registration scenarios
// like web search, which can take a long time to start on the server.
// Secondary check: fetch conversation from server and compare message state // Note: If a socket connection silently fails before tasks complete, the
// This catches cases where the done signal was missed AND syncs any missed content. // user can cancel via the stop button or navigate away to recover.
// Runs when: if (_hasStreamingAssistant && tasksDone) {
// 1. Tasks completed (were observed and are now gone), OR
// 2. Fallback: No tasks ever observed after exactly 3 polls (one-time check)
if (_hasStreamingAssistant && (tasksDone || fallbackCheck)) {
try { try {
final serverConversation = await api.getConversation( final serverConversation = await api.getConversation(
activeConversation.id, activeConversation.id,