diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart index e453088..8bfa03b 100644 --- a/lib/core/services/socket_service.dart +++ b/lib/core/services/socket_service.dart @@ -176,7 +176,10 @@ class SocketService with WidgetsBindingObserver { requireFocus: requireFocus, handler: handler, ); - return SocketEventSubscription(() => _chatEventHandlers.remove(id)); + return SocketEventSubscription( + () => _chatEventHandlers.remove(id), + handlerId: id, + ); } SocketEventSubscription addChannelEventHandler({ @@ -193,7 +196,10 @@ class SocketService with WidgetsBindingObserver { requireFocus: requireFocus, handler: handler, ); - return SocketEventSubscription(() => _channelEventHandlers.remove(id)); + return SocketEventSubscription( + () => _channelEventHandlers.remove(id), + handlerId: id, + ); } void clearChatEventHandlers() { @@ -204,6 +210,66 @@ class SocketService with WidgetsBindingObserver { _channelEventHandlers.clear(); } + /// Update the session ID for a chat event handler registration. + /// Used when socket reconnects and gets a new session ID. + void updateChatHandlerSessionId(String handlerId, String newSessionId) { + final existing = _chatEventHandlers[handlerId]; + if (existing != null) { + _chatEventHandlers[handlerId] = _ChatEventRegistration( + id: existing.id, + conversationId: existing.conversationId, + sessionId: newSessionId, + requireFocus: existing.requireFocus, + handler: existing.handler, + ); + } + } + + /// Update the session ID for a channel event handler registration. + /// Used when socket reconnects and gets a new session ID. + void updateChannelHandlerSessionId(String handlerId, String newSessionId) { + final existing = _channelEventHandlers[handlerId]; + if (existing != null) { + _channelEventHandlers[handlerId] = _ChannelEventRegistration( + id: existing.id, + conversationId: existing.conversationId, + sessionId: newSessionId, + requireFocus: existing.requireFocus, + handler: existing.handler, + ); + } + } + + /// Update session IDs for all handlers matching a conversation ID. + /// Called after socket reconnection to update handlers with the new session. + void updateSessionIdForConversation( + String conversationId, + String newSessionId, + ) { + for (final entry in _chatEventHandlers.entries.toList()) { + if (entry.value.conversationId == conversationId) { + _chatEventHandlers[entry.key] = _ChatEventRegistration( + id: entry.value.id, + conversationId: entry.value.conversationId, + sessionId: newSessionId, + requireFocus: entry.value.requireFocus, + handler: entry.value.handler, + ); + } + } + for (final entry in _channelEventHandlers.entries.toList()) { + if (entry.value.conversationId == conversationId) { + _channelEventHandlers[entry.key] = _ChannelEventRegistration( + id: entry.value.id, + conversationId: entry.value.conversationId, + sessionId: newSessionId, + requireFocus: entry.value.requireFocus, + handler: entry.value.handler, + ); + } + } + } + // Subscribe to an arbitrary socket.io event (used for dynamic tool channels) void onEvent(String eventName, void Function(dynamic data) handler) { _socket?.on(eventName, handler); @@ -378,19 +444,29 @@ class SocketService with WidgetsBindingObserver { incomingSessionId != null && registeredSessionId == incomingSessionId; + // Must match either conversation or session to be considered if (!matchesConversation && !matchesSession) { return false; } + // If no focus requirement, always deliver if (!requireFocus) { return true; } + // Session-targeted messages always bypass focus check (critical for + // background streaming - done/delta events must arrive even when backgrounded) if (matchesSession) { - // Session-targeted messages should always pass through even if unfocused return true; } + // FIX for issue #172: If conversation matches (even without session match), + // still deliver when app is in foreground. This handles socket reconnection + // where session_id changes but chat_id stays the same. + if (matchesConversation && registeredConversationId != null) { + return _isAppForeground; + } + return _isAppForeground; } @@ -487,9 +563,10 @@ class SocketService with WidgetsBindingObserver { } class SocketEventSubscription { - SocketEventSubscription(this._dispose); + SocketEventSubscription(this._dispose, {this.handlerId}); final VoidCallback _dispose; + final String? handlerId; bool _isDisposed = false; void dispose() { diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 17c411e..f22349d 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -123,6 +123,55 @@ class ActiveSocketStream { final VoidCallback disposeWatchdog; } +/// Helper to handle reconnect recovery asynchronously with proper error handling. +/// Extracted to avoid async callback in Timer which silently drops the Future. +Future _handleReconnectRecovery({ + required bool Function() hasFinished, + required List Function() getMessages, + required Future<({String content, List followUps, bool isDone})?> + Function() + pollServerForMessage, + required bool Function( + String, + List, { + required bool finishIfDone, + required bool isDone, + required String source, + }) + applyServerContent, + required void Function() syncImages, +}) async { + try { + if (hasFinished()) return; + + final msgs = getMessages(); + if (msgs.isEmpty || + msgs.last.role != 'assistant' || + !msgs.last.isStreaming) { + return; + } + + final result = await pollServerForMessage(); + if (hasFinished()) return; + + if (result != null) { + final applied = applyServerContent( + result.content, + result.followUps, + finishIfDone: true, + isDone: result.isDone, + source: 'Reconnect recovery', + ); + if (applied) { + syncImages(); + } + } + } catch (e) { + // Log error but don't crash - reconnect recovery is best-effort + DebugLogger.log('Reconnect recovery failed: $e', scope: 'streaming/helper'); + } +} + /// Unified streaming helper for chat send/regenerate flows. /// /// This attaches WebSocket event handlers and manages background search/image-gen @@ -203,19 +252,124 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ final socketSubscriptions = []; final hasSocketSignals = socketService != null || registerDeltaListener != null; + + // Reference to image sync function - initialized to no-op, reassigned after definition. + // Must not be `late` to avoid LateInitializationError if callbacks fire early. + void Function() syncImages = () {}; + + // Shared helper to poll server for message content. + // Used by watchdog timeout and reconnection handler to recover from missed events. + // Returns (content, followUps, isDone) or null if fetch fails or message not found. + Future<({String content, List followUps, bool isDone})?> + pollServerForMessage() async { + try { + final chatId = activeConversationId; + if (chatId == null || chatId.isEmpty) return null; + + final resp = await api.dio.get('/api/v1/chats/$chatId'); + final data = resp.data as Map?; + final chatObj = data?['chat'] as Map?; + if (chatObj == null) return null; + + final list = chatObj['messages']; + if (list is! List) return null; + + final serverMsg = list.firstWhere( + (m) => m is Map && m['id']?.toString() == assistantMessageId, + orElse: () => null, + ); + if (serverMsg == null || serverMsg is! Map) return null; + + // Extract content + final serverContent = serverMsg['content']; + String content = ''; + if (serverContent is String) { + content = serverContent; + } else if (serverContent is List) { + final textItem = serverContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + + // Extract follow-ups (check both camelCase and snake_case keys) + // Use _parseFollowUpsField for consistent parsing with socket handler + final followUpsRaw = serverMsg['followUps'] ?? serverMsg['follow_ups']; + final followUps = _parseFollowUpsField(followUpsRaw); + + // Check completion status + final isDone = + serverMsg['done'] == true || + (serverMsg['isStreaming'] != true && content.isNotEmpty); + + return (content: content, followUps: followUps, isDone: isDone); + } catch (e) { + DebugLogger.log('Server poll failed: $e', scope: 'streaming/helper'); + return null; + } + } + + // Helper to apply server content if it's better than local. + // Returns true if content was applied, so caller can trigger image sync. + bool applyServerContent( + String content, + List followUps, { + required bool finishIfDone, + required bool isDone, + required String source, + }) { + final msgs = getMessages(); + if (msgs.isEmpty || msgs.last.role != 'assistant') return false; + + if (content.isNotEmpty && content.length >= msgs.last.content.length) { + DebugLogger.log( + '$source: adopting server content (${content.length} chars)', + scope: 'streaming/helper', + ); + replaceLastMessageContent(content); + + if (followUps.isNotEmpty) { + setFollowUps(assistantMessageId, followUps); + } + + if (finishIfDone && isDone && msgs.last.isStreaming) { + wrappedFinishStreaming(); + } + return true; + } + return false; + } + if (hasSocketSignals) { - // Use a short inactivity timeout - if no data arrives for 10 seconds, - // something is likely wrong with the connection. Combined with 1-second - // polling and server state sync, this provides fast recovery. + // Inactivity timeout - if no data arrives for 10 seconds, poll server + // and finish streaming. This handles stuck connections (issue #172). socketWatchdog = InactivityWatchdog( window: const Duration(seconds: 10), - // Absolute cap ensures streaming never gets stuck indefinitely absoluteCap: const Duration(minutes: 5), - onTimeout: () { + onTimeout: () async { DebugLogger.log( - 'Socket watchdog timeout - finishing streaming gracefully', + 'Socket watchdog timeout - polling server', scope: 'streaming/helper', ); + + final result = await pollServerForMessage(); + if (result != null) { + final applied = applyServerContent( + result.content, + result.followUps, + finishIfDone: false, // We always finish on timeout + isDone: result.isDone, + source: 'Watchdog', + ); + if (applied) { + syncImages(); + } + } + + // Clean up and finish try { for (final dispose in socketSubscriptions) { try { @@ -224,6 +378,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } socketSubscriptions.clear(); } catch (_) {} + try { final msgs = getMessages(); if (msgs.isNotEmpty && @@ -236,103 +391,43 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ }, )..start(); - // Subscribe to socket reconnection events to sync state after reconnect. - // This catches cases where the done signal was missed due to disconnection. + // Handle socket reconnection - update session IDs and check for missed events if (socketService != null) { StreamSubscription? reconnectSub; Timer? reconnectDelayTimer; - var reconnectSubDisposed = false; reconnectSub = socketService.onReconnect.listen((_) { DebugLogger.log( - 'Socket reconnected - checking server state for missed signals', + 'Socket reconnected - updating session ID', scope: 'streaming/helper', ); - // Cancel any pending timer from a previous reconnect + // Update handler registrations with new session ID (issue #172 fix) + final newSessionId = socketService.sessionId; + final convId = activeConversationId; + if (newSessionId != null && convId != null && convId.isNotEmpty) { + socketService.updateSessionIdForConversation(convId, newSessionId); + } + + // Brief delay then check server for missed completion reconnectDelayTimer?.cancel(); - - // After reconnection, give a brief moment for any queued events - // then check server state to catch any missed completion signals. - // Use Timer instead of Future.delayed so it can be cancelled on dispose. - reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () async { - // Check if disposed before executing - if (reconnectSubDisposed || hasFinished) return; - - // Check current state before making the async call - var msgs = getMessages(); - if (msgs.isEmpty || msgs.last.role != 'assistant') return; - if (!msgs.last.isStreaming) return; - - // Fetch conversation from server to check if streaming actually completed - try { - final chatId = activeConversationId; - if (chatId != null && chatId.isNotEmpty) { - final resp = await api.dio.get('/api/v1/chats/$chatId'); - - // Re-check state after async call - it may have changed or been disposed - if (reconnectSubDisposed || hasFinished) return; - msgs = getMessages(); - if (msgs.isEmpty || msgs.last.role != 'assistant') return; - if (!msgs.last.isStreaming) return; - - final data = resp.data as Map?; - final chatObj = data?['chat'] as Map?; - - if (chatObj != null) { - // Check if server has the completed message - final list = chatObj['messages']; - if (list is List) { - final serverMsg = list.firstWhere( - (m) => - m is Map && m['id']?.toString() == assistantMessageId, - orElse: () => null, - ); - if (serverMsg != null && serverMsg is Map) { - final serverContent = serverMsg['content']; - String content = ''; - if (serverContent is String) { - content = serverContent; - } else if (serverContent is List) { - final textItem = serverContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = textItem['text']?.toString() ?? ''; - } - } - - // If server has content, adopt it and finish streaming - // Use current msgs (re-fetched after await) for comparison - if (content.isNotEmpty && - content.length >= msgs.last.content.length) { - DebugLogger.log( - 'Reconnect recovery: adopting server content (${content.length} chars)', - scope: 'streaming/helper', - ); - replaceLastMessageContent(content); - wrappedFinishStreaming(); - } - } - } - } - } - } catch (e) { - DebugLogger.log( - 'Reconnect recovery fetch failed: $e', - scope: 'streaming/helper', - ); - } + reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () { + // Wrap async work in unawaited to handle errors properly + unawaited( + _handleReconnectRecovery( + hasFinished: () => hasFinished, + getMessages: getMessages, + pollServerForMessage: pollServerForMessage, + applyServerContent: applyServerContent, + syncImages: syncImages, + ), + ); }); }); socketSubscriptions.add(() { - reconnectSubDisposed = true; reconnectDelayTimer?.cancel(); - reconnectDelayTimer = null; reconnectSub?.cancel(); - reconnectSub = null; }); } } @@ -474,6 +569,9 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } catch (_) {} } + // Bind the late reference now that updateImagesFromCurrentContent is defined + syncImages = updateImagesFromCurrentContent; + bool refreshingSnapshot = false; Future refreshConversationSnapshot() async { if (refreshingSnapshot) return; @@ -642,12 +740,17 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ socketWatchdog?.ping(); // Increased timeout to match our more generous streaming timeouts // OpenWebUI doesn't have such aggressive channel timeouts - Future.delayed(const Duration(minutes: 12), () { + // Use Timer instead of Future.delayed so it can be cancelled on cleanup + final channelTimeoutTimer = Timer(const Duration(minutes: 12), () { try { socketService?.offEvent(channel); } catch (_) {} socketWatchdog?.stop(); }); + // Register cleanup so the timer is cancelled if streaming completes early + socketSubscriptions.add(() { + channelTimeoutTimer.cancel(); + }); } void chatHandler( @@ -1311,12 +1414,30 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } }, onComplete: () { - // HTTP stream completed - cleanup already done in onDone handler. - // For WebSocket flows, actual completion is handled by socket events (done: true). - // Only finish streaming here if there are no socket subscriptions (simple/legacy flow). + // HTTP stream completed. + // With WebSocket-based streaming, HTTP closes immediately after returning task_id. + // All actual content comes via WebSocket events, so we should NOT finish streaming + // here if socket subscriptions are active - the socket done:true event will finish it. + DebugLogger.log( + 'HTTP stream complete ' + '(socketSubs=${socketSubscriptions.length}, socketConnected=${socketService?.isConnected})', + scope: 'streaming/helper', + ); + + // Only finish streaming if no socket subscriptions are active. + // If sockets are active, they will handle the completion via done:true event. if (socketSubscriptions.isEmpty) { + DebugLogger.log( + 'No socket subscriptions - finishing streaming on HTTP complete', + scope: 'streaming/helper', + ); wrappedFinishStreaming(); Future.microtask(refreshConversationSnapshot); + } else { + DebugLogger.log( + 'Socket subscriptions active - waiting for socket done signal', + scope: 'streaming/helper', + ); } }, onError: (error, stackTrace) async { @@ -1334,10 +1455,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ // Check if this is a recoverable error (network issues, etc.) final errorText = error.toString(); final isRecoverable = - (error is! FormatException && - errorText.contains('SocketException')) || - errorText.contains('TimeoutException') || - errorText.contains('HandshakeException'); + error is! FormatException && + (errorText.contains('SocketException') || + errorText.contains('TimeoutException') || + errorText.contains('HandshakeException')); if (isRecoverable && socketService != null) { // Try to recover via socket connection if available diff --git a/lib/core/utils/inactivity_watchdog.dart b/lib/core/utils/inactivity_watchdog.dart index 8d0f71c..c118654 100644 --- a/lib/core/utils/inactivity_watchdog.dart +++ b/lib/core/utils/inactivity_watchdog.dart @@ -5,6 +5,9 @@ import 'dart:async'; /// Call [ping] whenever activity occurs. If no activity happens /// within [window], [onTimeout] fires. Optionally, an [absoluteCap] /// enforces a maximum total duration regardless of activity. +/// +/// The [onTimeout] callback can be sync or async - if async, it will be +/// awaited before the watchdog considers itself fully stopped. class InactivityWatchdog { InactivityWatchdog({ required Duration window, @@ -13,16 +16,20 @@ class InactivityWatchdog { }) : _window = window, _absoluteCap = absoluteCap; - final void Function() onTimeout; + final FutureOr Function() onTimeout; Duration _window; Duration? _absoluteCap; Timer? _timer; Timer? _absoluteTimer; bool _started = false; + bool _firing = false; Duration get window => _window; + /// Whether the timeout callback is currently executing. + bool get isFiring => _firing; + void setWindow(Duration newWindow) { _window = newWindow; if (_started) { @@ -43,6 +50,8 @@ class InactivityWatchdog { void start() { if (_started) return; + // Prevent restart while callback is still executing to avoid double-fire + if (_firing) return; _started = true; _restart(); if (_absoluteCap != null) { @@ -51,6 +60,8 @@ class InactivityWatchdog { } void ping() { + // Prevent restart while callback is still executing to avoid double-fire + if (_firing) return; if (!_started) { start(); return; @@ -73,10 +84,24 @@ class InactivityWatchdog { _timer = Timer(_window, _fire); } + /// Synchronous entry point called by Timer. Kicks off async work. void _fire() { + if (_firing) return; // Prevent re-entry + _firing = true; stop(); + // Execute the callback asynchronously. We don't await because Timer + // expects a sync callback, but the async work will complete in background. + _executeCallback(); + } + + /// Executes the timeout callback asynchronously. + Future _executeCallback() async { try { - onTimeout(); - } catch (_) {} + await onTimeout(); + } catch (_) { + // Swallow errors to prevent unhandled exceptions + } finally { + _firing = false; + } } } diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 5c93338..d1dac03 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -355,29 +355,41 @@ class ChatMessagesNotifier extends Notifier> { final serverMessages = serverConversation.messages; if (serverMessages.isNotEmpty && state.isNotEmpty) { - final serverLast = serverMessages.last; final localLast = state.last; - // If server has the same message but says it's not streaming, - // or server has more content, sync from server - if (serverLast.id == localLast.id && - serverLast.role == 'assistant') { - final serverDone = !serverLast.isStreaming; - final serverHasMoreContent = - serverLast.content.length > localLast.content.length; + // Case 1: Server has more messages than local - streaming must be done + if (serverMessages.length > state.length) { + DebugLogger.log( + 'Server sync: server has more messages ' + '(${serverMessages.length} vs ${state.length})', + scope: 'chat/providers', + ); + state = serverMessages; + _cancelMessageStream(); + return; + } - if (serverDone || serverHasMoreContent) { - DebugLogger.log( - 'Server sync: adopting server state ' - '(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)', - scope: 'chat/providers', - ); - state = serverMessages; - // Always cancel local streaming when adopting server state. - // If serverDone, streaming is complete. If serverHasMoreContent, - // we've adopted server content and continuing local streaming - // could cause conflicts or duplicate content. - _cancelMessageStream(); + // Case 2: Find the local streaming message in server messages by ID + // This handles cases where last messages differ + if (localLast.role == 'assistant' && localLast.isStreaming) { + final serverVersion = serverMessages + .where((m) => m.id == localLast.id) + .firstOrNull; + + if (serverVersion != null) { + final serverDone = !serverVersion.isStreaming; + final serverHasMoreContent = + serverVersion.content.length > localLast.content.length; + + if (serverDone || serverHasMoreContent) { + DebugLogger.log( + 'Server sync: adopting server state ' + '(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)', + scope: 'chat/providers', + ); + state = serverMessages; + _cancelMessageStream(); + } } } }