diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart index 7e8bf26..e453088 100644 --- a/lib/core/services/socket_service.dart +++ b/lib/core/services/socket_service.dart @@ -30,6 +30,13 @@ class SocketService with WidgetsBindingObserver { final Map _channelEventHandlers = {}; int _handlerSeed = 0; + /// Stream controller that emits when a socket reconnection occurs. + /// Listeners can use this to sync state after a reconnect. + final _reconnectController = StreamController.broadcast(); + + /// Stream that emits when a socket reconnection occurs. + Stream get onReconnect => _reconnectController.stream; + SocketService({ required this.serverConfig, String? authToken, @@ -214,6 +221,7 @@ class SocketService with WidgetsBindingObserver { WidgetsBinding.instance.removeObserver(this); _chatEventHandlers.clear(); _channelEventHandlers.clear(); + _reconnectController.close(); } // Best-effort: ensure there is an active connection and wait briefly. @@ -279,6 +287,10 @@ class SocketService with WidgetsBindingObserver { 'auth': {'token': _authToken}, }); } + // Notify listeners that a reconnection occurred so they can refresh state + if (!_reconnectController.isClosed) { + _reconnectController.add(null); + } } void _handleConnectError(dynamic err) {} diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index d450d0d..c22f6af 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -204,9 +204,12 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ final hasSocketSignals = socketService != null || registerDeltaListener != null; if (hasSocketSignals) { - // Increase timeout to match OpenWebUI's more generous timeouts for long responses + // Use a reasonable inactivity timeout - if no data arrives for 45 seconds, + // something is likely wrong with the connection socketWatchdog = InactivityWatchdog( - window: const Duration(minutes: 15), // Increased from 5 to 15 minutes + window: const Duration(seconds: 45), + // Absolute cap ensures streaming never gets stuck indefinitely + absoluteCap: const Duration(minutes: 10), onTimeout: () { DebugLogger.log( 'Socket watchdog timeout - finishing streaming gracefully', @@ -231,6 +234,106 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ socketWatchdog?.stop(); }, )..start(); + + // Subscribe to socket reconnection events to sync state after reconnect. + // This catches cases where the done signal was missed due to disconnection. + if (socketService != null) { + StreamSubscription? reconnectSub; + Timer? reconnectDelayTimer; + var reconnectSubDisposed = false; + + reconnectSub = socketService.onReconnect.listen((_) { + DebugLogger.log( + 'Socket reconnected - checking server state for missed signals', + scope: 'streaming/helper', + ); + + // Cancel any pending timer from a previous reconnect + reconnectDelayTimer?.cancel(); + + // After reconnection, give a brief moment for any queued events + // then check server state to catch any missed completion signals. + // Use Timer instead of Future.delayed so it can be cancelled on dispose. + reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () async { + // Check if disposed before executing + if (reconnectSubDisposed || hasFinished) return; + + // Check current state before making the async call + var msgs = getMessages(); + if (msgs.isEmpty || msgs.last.role != 'assistant') return; + if (!msgs.last.isStreaming) return; + + // Fetch conversation from server to check if streaming actually completed + try { + final chatId = activeConversationId; + if (chatId != null && chatId.isNotEmpty) { + final resp = await api.dio.get('/api/v1/chats/$chatId'); + + // Re-check state after async call - it may have changed or been disposed + if (reconnectSubDisposed || hasFinished) return; + msgs = getMessages(); + if (msgs.isEmpty || msgs.last.role != 'assistant') return; + if (!msgs.last.isStreaming) return; + + final data = resp.data as Map?; + final chatObj = data?['chat'] as Map?; + + if (chatObj != null) { + // Check if server has the completed message + final list = chatObj['messages']; + if (list is List) { + final serverMsg = list.firstWhere( + (m) => + m is Map && m['id']?.toString() == assistantMessageId, + orElse: () => null, + ); + if (serverMsg != null && serverMsg is Map) { + final serverContent = serverMsg['content']; + String content = ''; + if (serverContent is String) { + content = serverContent; + } else if (serverContent is List) { + final textItem = serverContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + + // If server has content, adopt it and finish streaming + // Use current msgs (re-fetched after await) for comparison + if (content.isNotEmpty && + content.length >= msgs.last.content.length) { + DebugLogger.log( + 'Reconnect recovery: adopting server content (${content.length} chars)', + scope: 'streaming/helper', + ); + replaceLastMessageContent(content); + wrappedFinishStreaming(); + } + } + } + } + } + } catch (e) { + DebugLogger.log( + 'Reconnect recovery fetch failed: $e', + scope: 'streaming/helper', + ); + } + }); + }); + + socketSubscriptions.add(() { + reconnectSubDisposed = true; + reconnectDelayTimer?.cancel(); + reconnectDelayTimer = null; + reconnectSub?.cancel(); + reconnectSub = null; + }); + } } Timer? imageCollectionDebounce; diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 9aced83..e30e447 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -135,7 +135,12 @@ class ChatMessagesNotifier extends Notifier> { final serverMessages = next?.messages ?? const []; // Primary rule: adopt server messages when there are strictly more of them. if (serverMessages.length > state.length) { + // Check streaming state BEFORE updating state + final needsCleanup = _shouldCleanupStreamingFromServer( + serverMessages, + ); state = serverMessages; + if (needsCleanup) _cancelMessageStream(); return; } @@ -152,10 +157,20 @@ class ChatMessagesNotifier extends Notifier> { serverText.isNotEmpty && serverText.length > localText.length; final localEmptyButServerHas = localText.isEmpty && serverText.isNotEmpty; + // Also recover if server says streaming is done but local still streaming + final serverDoneButLocalStreaming = + !serverLast.isStreaming && localLast.isStreaming; if (sameLastId && isAssistant && - (serverHasMore || localEmptyButServerHas)) { + (serverHasMore || + localEmptyButServerHas || + serverDoneButLocalStreaming)) { + // Check streaming state BEFORE updating state + final needsCleanup = _shouldCleanupStreamingFromServer( + serverMessages, + ); state = serverMessages; + if (needsCleanup) _cancelMessageStream(); return; } } @@ -217,6 +232,44 @@ class ChatMessagesNotifier extends Notifier> { _activeStreamingMessageId = null; } + /// Checks if streaming cleanup is needed when adopting server messages. + /// Must be called BEFORE updating state, as it compares current local state + /// with incoming server state. + bool _shouldCleanupStreamingFromServer(List serverMessages) { + if (serverMessages.isEmpty) return false; + if (!_hasStreamingAssistant) return false; + + // Find the local streaming assistant message + final localStreamingMsg = state.lastWhere( + (m) => m.role == 'assistant' && m.isStreaming, + orElse: () => state.last, + ); + + // Find the same message in server messages by ID + final serverMsg = serverMessages.where((m) => m.id == localStreamingMsg.id); + if (serverMsg.isNotEmpty && !serverMsg.first.isStreaming) { + DebugLogger.log( + 'Server indicates streaming complete for message ${localStreamingMsg.id}', + scope: 'chat/providers', + ); + return true; + } + + // Also check if server has MORE messages than local - if so, streaming must be done + // (e.g., server has [assistant(done), user] but local only has [assistant(streaming)]) + if (serverMessages.length > state.length) { + // Server has additional messages, so any local streaming must have completed + DebugLogger.log( + 'Server has more messages (${serverMessages.length} vs ${state.length}) - ' + 'streaming must be complete', + scope: 'chat/providers', + ); + return true; + } + + return false; + } + bool get _hasStreamingAssistant { if (state.isEmpty) return false; final last = state.last; @@ -262,16 +315,63 @@ class ChatMessagesNotifier extends Notifier> { _taskStatusCheckInFlight = true; try { + // Check both task status and server message state final taskIds = await api.getTaskIdsByChat(activeConversation.id); - if (taskIds.isEmpty) { - if (_observedRemoteTask && _hasStreamingAssistant) { - finishStreaming(); - } else if (!_observedRemoteTask) { - // No tasks reported yet; keep monitoring to allow registration. - } - } else { + final hasActiveTasks = taskIds.isNotEmpty; + + if (hasActiveTasks) { _observedRemoteTask = true; } + + // When no active tasks and we previously observed tasks, streaming should be done. + // Run secondary check to sync any missed content from server. + final tasksDone = _observedRemoteTask && !hasActiveTasks; + + // Secondary check: fetch conversation from server and compare message state + // This catches cases where the done signal was missed AND syncs any missed content. + // Only run when tasks have actually completed (were observed and are now gone), + // not on every poll before tasks are registered. + if (_hasStreamingAssistant && tasksDone) { + try { + final serverConversation = await api.getConversation( + activeConversation.id, + ); + final serverMessages = serverConversation.messages; + + if (serverMessages.isNotEmpty && state.isNotEmpty) { + final serverLast = serverMessages.last; + final localLast = state.last; + + // If server has the same message but says it's not streaming, + // or server has more content, sync from server + if (serverLast.id == localLast.id && + serverLast.role == 'assistant') { + final serverDone = !serverLast.isStreaming; + final serverHasMoreContent = + serverLast.content.length > localLast.content.length; + + if (serverDone || serverHasMoreContent) { + DebugLogger.log( + 'Server sync: adopting server state ' + '(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)', + scope: 'chat/providers', + ); + state = serverMessages; + // Always cancel local streaming when adopting server state. + // If serverDone, streaming is complete. If serverHasMoreContent, + // we've adopted server content and continuing local streaming + // could cause conflicts or duplicate content. + _cancelMessageStream(); + } + } + } + } catch (e) { + DebugLogger.log( + 'Server conversation fetch failed: $e', + scope: 'chat/providers', + ); + } + } } catch (err, stack) { DebugLogger.log('Task status poll failed: $err', scope: 'chat/provider'); debugPrintStack(stackTrace: stack);