From 61a3fcc83ae17ae41cb691db06fdd29ff300844b Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Thu, 27 Nov 2025 14:36:13 +0530 Subject: [PATCH] feat(streaming): Simplify streaming logic and remove persistent tracking --- android/app/build.gradle.kts | 2 +- lib/core/providers/app_startup_providers.dart | 12 +- lib/core/router/app_router.dart | 7 +- lib/core/services/api_service.dart | 144 ++-- .../persistent_streaming_service.dart | 627 ------------------ lib/core/services/socket_service.dart | 2 + lib/core/services/sse_stream_parser.dart | 202 ------ lib/core/services/streaming_helper.dart | 141 ++-- .../streaming_response_controller.dart | 2 +- .../chat/providers/chat_providers.dart | 106 ++- lib/features/chat/views/chat_page.dart | 2 +- 11 files changed, 181 insertions(+), 1066 deletions(-) delete mode 100644 lib/core/services/persistent_streaming_service.dart delete mode 100644 lib/core/services/sse_stream_parser.dart diff --git a/android/app/build.gradle.kts b/android/app/build.gradle.kts index 9770bb1..bb3fe90 100644 --- a/android/app/build.gradle.kts +++ b/android/app/build.gradle.kts @@ -17,7 +17,7 @@ if (keystorePropertiesFile.exists()) { android { namespace = "app.cogwheel.conduit" compileSdk = 36 - ndkVersion = "27.0.12077973" + ndkVersion = "29.0.14206865" defaultConfig { applicationId = "app.cogwheel.conduit" diff --git a/lib/core/providers/app_startup_providers.dart b/lib/core/providers/app_startup_providers.dart index aa2aa29..817b747 100644 --- a/lib/core/providers/app_startup_providers.dart +++ b/lib/core/providers/app_startup_providers.dart @@ -13,7 +13,6 @@ import '../services/app_intents_service.dart'; import '../services/quick_actions_service.dart'; import '../models/conversation.dart'; import '../services/background_streaming_handler.dart'; -import '../services/persistent_streaming_service.dart'; import '../services/socket_service.dart'; import '../../features/onboarding/views/onboarding_sheet.dart'; import '../../features/chat/providers/chat_providers.dart'; @@ -198,15 +197,6 @@ class AppStartupFlow extends _$AppStartupFlow { keepAlive(socketPersistenceProvider); }); - // Ensure persistent streaming uses the shared connectivity service - final connectivityService = ref.read(connectivityServiceProvider); - Future.delayed(const Duration(milliseconds: 160), () { - if (!ref.mounted) return; - PersistentStreamingService().attachConnectivityService( - connectivityService, - ); - }); - // Warm the conversations list in the background as soon as possible, // but avoid doing so on poor connectivity to reduce startup load. // Apply a small randomized delay to smooth load spikes across app wakes. @@ -432,7 +422,7 @@ class _ForegroundRefreshObserver extends WidgetsBindingObserver { } /// Attempts to keep the realtime socket connection alive while the app is -/// backgrounded, similar to how PersistentStreamingService works for streams. +/// backgrounded using BackgroundStreamingHandler for platform-specific handling. /// /// Notes: /// - iOS: limited to short background task windows; we send periodic keepAlive. diff --git a/lib/core/router/app_router.dart b/lib/core/router/app_router.dart index 7446559..5b51590 100644 --- a/lib/core/router/app_router.dart +++ b/lib/core/router/app_router.dart @@ -7,9 +7,9 @@ import '../auth/auth_state_manager.dart'; import '../providers/app_providers.dart'; import '../services/connectivity_service.dart'; import '../services/navigation_service.dart'; -import '../services/persistent_streaming_service.dart'; import '../utils/debug_logger.dart'; import '../../features/auth/providers/unified_auth_providers.dart'; +import '../../features/chat/providers/chat_providers.dart'; import '../../features/auth/views/authentication_page.dart'; import '../../features/auth/views/connect_signin_page.dart'; import '../../features/auth/views/connection_issue_page.dart'; @@ -37,6 +37,7 @@ class RouterNotifier extends ChangeNotifier { connectivityStatusProvider, _onStateChanged, ), + ref.listen(isChatStreamingProvider, _onStateChanged), ]; } @@ -112,8 +113,8 @@ class RouterNotifier extends ChangeNotifier { // 2. Connectivity is explicitly offline // 3. Auth is authenticated (don't interrupt auth flow) // 4. App is in foreground and offline warning isn't suppressed - // 5. No active streaming is in progress (avoid interrupting token streams) - final hasActiveStreams = PersistentStreamingService().activeStreamCount > 0; + // 5. No active streaming is in progress (avoid interrupting chat streams) + final hasActiveStreams = ref.read(isChatStreamingProvider); final shouldShowConnectionIssue = !reviewerMode && connectivity == ConnectivityStatus.offline && diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index fe02353..4ee5f9c 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -19,9 +19,7 @@ import '../models/prompt.dart'; import '../auth/api_auth_interceptor.dart'; import '../error/api_error_interceptor.dart'; // Tool-call details are parsed in the UI layer to render collapsible blocks -import 'persistent_streaming_service.dart'; import 'connectivity_service.dart'; -import 'sse_stream_parser.dart'; import '../utils/debug_logger.dart'; import 'conversation_parsing.dart'; import 'worker_manager.dart'; @@ -2596,36 +2594,12 @@ class ApiService { // Chat streaming with conversation context // Track cancellable streaming requests by messageId for stop parity final Map _streamCancelTokens = {}; - final Map _messagePersistentStreamIds = {}; - /// Associates a streaming message with its persistent stream identifier. - void registerPersistentStreamForMessage(String messageId, String streamId) { - _messagePersistentStreamIds[messageId] = streamId; - } - - /// Removes the persistent stream mapping for a message if it matches. - /// - /// Returns the removed persistent stream identifier when one existed and - /// matched the optional [expectedStreamId]. - String? clearPersistentStreamForMessage( - String messageId, { - String? expectedStreamId, - }) { - final current = _messagePersistentStreamIds[messageId]; - if (current == null) { - return null; - } - if (expectedStreamId != null && current != expectedStreamId) { - return null; - } - return _messagePersistentStreamIds.remove(messageId); - } - - // Send message using dual-stream approach (HTTP SSE + WebSocket events). - // Matches OpenWebUI web client behavior: - // - HTTP SSE stream provides immediate content chunks - // - WebSocket events deliver metadata, tool status, sources, follow-ups - // - Both streams run in parallel for reliability + // Send message using WebSocket-only streaming. + // Matches OpenWebUI web client behavior when session_id + chat_id + message_id are provided: + // - HTTP POST returns JSON with task_id (no SSE streaming) + // - All content and metadata delivered via WebSocket events + // - Events: chat:completion, chat:message:delta, status, source, follow_ups, etc. // Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow) ({ Stream stream, @@ -2790,7 +2764,7 @@ class ApiService { _traceApi('Including non-image files in request: ${allFiles.length}'); } - _traceApi('Preparing dual-stream chat request (HTTP SSE + WebSocket)'); + _traceApi('Preparing WebSocket-only chat request'); _traceApi('Model: $model'); _traceApi('Message count: ${processedMessages.length}'); @@ -2830,118 +2804,85 @@ class ApiService { ); _traceApi('Has background_tasks: ${data.containsKey('background_tasks')}'); - _traceApi('Initiating dual-stream request (HTTP SSE + WebSocket)'); + _traceApi('Initiating WebSocket-only chat request'); _traceApi('Posting to /api/chat/completions'); // Create a cancel token for this request final cancelToken = CancelToken(); _streamCancelTokens[messageId] = cancelToken; - // Start HTTP SSE stream (matches web client behavior) - // The WebSocket events will run in parallel via streaming_helper.dart + // Send HTTP request to initiate chat task + // With session_id + chat_id + message_id, the server returns a task_id + // and all streaming happens via WebSocket events (not SSE) () async { try { final resp = await _dio.post( '/api/chat/completions', data: data, options: Options( - responseType: ResponseType.stream, - // Extended timeout for streaming responses - allow up to 10 minutes - // for long-running tool calls and reasoning - receiveTimeout: const Duration(minutes: 10), - // Shorter send timeout for the initial request + responseType: ResponseType.json, + receiveTimeout: const Duration(seconds: 30), sendTimeout: const Duration(seconds: 30), - headers: { - 'Accept': 'text/event-stream', - // Enable HTTP keep-alive to maintain connection in background - 'Connection': 'keep-alive', - // Request server to send keep-alive messages - 'Cache-Control': 'no-cache', - }, ), cancelToken: cancelToken, ); final respData = resp.data; - // Check if we got a task_id response (non-streaming) - if (respData is Map && respData['task_id'] != null) { - final taskId = respData['task_id'].toString(); - _traceApi('Background task created: $taskId'); - - // In this case, all streaming will happen via WebSocket - // Close HTTP stream but keep WebSocket active - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - - // We have a streaming response body - if (respData is ResponseBody) { - _traceApi('HTTP SSE stream started for message: $messageId'); - - // Parse SSE stream and forward chunks to controller - await for (final chunk in SSEStreamParser.parseResponseStream( - respData, - splitLargeDeltas: false, - heartbeatTimeout: const Duration(minutes: 2), - onHeartbeat: () { - // Notify persistent streaming service that connection is alive - final persistentStreamId = _messagePersistentStreamIds[messageId]; - if (persistentStreamId != null) { - PersistentStreamingService().updateStreamProgress( - persistentStreamId, - chunkSequence: DateTime.now().millisecondsSinceEpoch, - ); - } - }, - )) { + if (respData is Map) { + if (respData['task_id'] != null) { + final taskId = respData['task_id'].toString(); + _traceApi('Background task created: $taskId'); + } else if (respData['status'] == true) { + _traceApi('Chat task initiated successfully'); + } else if (respData['error'] != null) { + _traceApi('Server error: ${respData['error']}'); if (!streamController.isClosed) { - streamController.add(chunk); - } else { - _traceApi('Stream controller closed, stopping SSE parsing'); - break; + streamController.addError(Exception(respData['error'].toString())); } } - - _traceApi('HTTP SSE stream completed for message: $messageId'); - } else { - _traceApi('Unexpected response type: ${respData.runtimeType}'); } - // Close the HTTP stream controller - // WebSocket events will continue independently via streaming_helper + // Close HTTP stream controller - WebSocket handles all content delivery if (!streamController.isClosed) { streamController.close(); } } on DioException catch (e) { if (CancelToken.isCancel(e)) { - _traceApi('HTTP stream cancelled for message: $messageId'); + _traceApi('Request cancelled for message: $messageId'); } else { - _traceApi('HTTP stream error: $e'); + _traceApi('Request error: $e'); if (!streamController.isClosed) { streamController.addError(e); streamController.close(); } } } catch (e) { - _traceApi('Unexpected error in HTTP stream: $e'); + _traceApi('Unexpected error: $e'); if (!streamController.isClosed) { streamController.addError(e); streamController.close(); } - } finally { - _streamCancelTokens.remove(messageId); } + // Note: Don't remove cancel token here - it should remain until WebSocket + // streaming finishes so Stop button can cancel the active generation. + // Token is removed by clearStreamCancelToken() when streaming completes. }(); + // Determine if this is actually a background flow based on the request payload + final bool isBackgroundFlow = + hasBackgroundTasksPayload || + (toolIds != null && toolIds.isNotEmpty) || + (toolServers != null && toolServers.isNotEmpty) || + enableWebSearch || + enableImageGeneration; + return ( stream: streamController.stream, messageId: messageId, sessionId: sessionId, socketSessionId: socketSessionId, - isBackgroundFlow: true, + isBackgroundFlow: isBackgroundFlow, ); } @@ -2975,13 +2916,12 @@ class ApiService { token.cancel('User cancelled'); } } catch (_) {} + } - try { - final pid = clearPersistentStreamForMessage(messageId); - if (pid != null) { - PersistentStreamingService().unregisterStream(pid); - } - } catch (_) {} + /// Clears the cancel token for a message when streaming completes normally. + /// Called by streaming_helper when finishStreaming is invoked. + void clearStreamCancelToken(String messageId) { + _streamCancelTokens.remove(messageId); } // File upload for RAG diff --git a/lib/core/services/persistent_streaming_service.dart b/lib/core/services/persistent_streaming_service.dart deleted file mode 100644 index e82d6d4..0000000 --- a/lib/core/services/persistent_streaming_service.dart +++ /dev/null @@ -1,627 +0,0 @@ -import 'dart:async'; -import 'package:flutter/material.dart'; -import 'package:wakelock_plus/wakelock_plus.dart'; -import 'background_streaming_handler.dart'; -import 'connectivity_service.dart'; -import '../utils/debug_logger.dart'; - -class PersistentStreamingService with WidgetsBindingObserver { - static final PersistentStreamingService _instance = - PersistentStreamingService._internal(); - factory PersistentStreamingService() => _instance; - PersistentStreamingService._internal() { - _initialize(); - } - - // Active streams registry - final Map _activeStreams = {}; - final Map _streamControllers = {}; - final Map _streamRecoveryCallbacks = {}; - final Map> _streamMetadata = {}; - - // App lifecycle state - // AppLifecycleState? _lastLifecycleState; // Removed as it's unused - bool _isInBackground = false; - Timer? _backgroundTimer; - Timer? _heartbeatTimer; - - // Background streaming handler - late final BackgroundStreamingHandler _backgroundHandler; - - // Connectivity monitoring - StreamSubscription? _connectivitySubscription; - ConnectivityService? _connectivityService; - bool _hasConnectivity = true; - - // Recovery state - final Map _retryAttempts = {}; - static const int _maxRetryAttempts = 3; - static const Duration _retryDelay = Duration(seconds: 2); - - void _initialize() { - WidgetsBinding.instance.addObserver(this); - _backgroundHandler = BackgroundStreamingHandler.instance; - _setupBackgroundHandlerCallbacks(); - _startHeartbeat(); - } - - void _setupBackgroundHandlerCallbacks() { - _backgroundHandler.onServiceFailed = (error, errorType, streamIds) { - DebugLogger.error( - 'background-service-failed', - scope: 'streaming/persistent', - error: '$errorType: $error', - data: {'affectedStreams': streamIds}, - ); - - // Attempt immediate recovery for failed streams - for (final streamId in streamIds) { - final callback = _streamRecoveryCallbacks[streamId]; - if (callback != null) { - // Schedule recovery after a short delay - Future.delayed(const Duration(seconds: 2), () { - if (_activeStreams.containsKey(streamId)) { - _attemptStreamRecovery(streamId, callback); - } - }); - } - } - }; - - _backgroundHandler.onStreamsSuspending = (streamIds) { - DebugLogger.stream( - 'PersistentStreaming: Streams suspending - $streamIds', - ); - // Mark streams as suspended but don't close them yet - for (final streamId in streamIds) { - _markStreamAsSuspended(streamId); - } - }; - - _backgroundHandler.onBackgroundTaskExpiring = () { - DebugLogger.stream('PersistentStreaming: Background task expiring'); - // Save states and prepare for recovery - _saveStreamStatesForRecovery(); - }; - - _backgroundHandler - .onBackgroundTaskExtended = (streamIds, estimatedSeconds) { - DebugLogger.stream( - 'PersistentStreaming: Background task extended for $estimatedSeconds seconds', - ); - // BGTaskScheduler has given us more time - streams can continue - for (final streamId in streamIds) { - final metadata = _streamMetadata[streamId]; - if (metadata != null) { - metadata['bgTaskExtended'] = true; - metadata['bgTaskExtendedAt'] = DateTime.now(); - metadata['bgTaskEstimatedTime'] = estimatedSeconds; - } - } - }; - - _backgroundHandler.onBackgroundKeepAlive = () { - DebugLogger.stream('PersistentStreaming: Background keep-alive signal'); - // BGTaskScheduler is keeping us alive - we can continue streaming - _heartbeatTimer?.cancel(); - _startHeartbeat(); // Restart heartbeat timer - }; - - _backgroundHandler.shouldContinueInBackground = () { - return _activeStreams.isNotEmpty; - }; - } - - void attachConnectivityService(ConnectivityService service) { - if (identical(_connectivityService, service)) { - return; - } - - _connectivitySubscription?.cancel(); - _connectivityService = service; - _connectivitySubscription = service.statusStream - .map((status) => status == ConnectivityStatus.online) - .listen(_handleConnectivityChange); - } - - void _handleConnectivityChange(bool connected) { - final wasConnected = _hasConnectivity; - _hasConnectivity = connected; - - if (!wasConnected && connected) { - DebugLogger.stream( - 'PersistentStreaming: Connectivity restored, recovering streams', - ); - _recoverActiveStreams(); - } else if (wasConnected && !connected) { - DebugLogger.stream( - 'PersistentStreaming: Connectivity lost, suspending streams', - ); - _suspendAllStreams(); - } - } - - void _startHeartbeat() { - _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) { - if (_activeStreams.isNotEmpty && _isInBackground) { - _backgroundHandler.keepAlive(); - - // Check for stale streams during background operation - _checkStreamHealth(); - } - }); - } - - void _checkStreamHealth() { - final now = DateTime.now(); - final staleStreams = []; - - for (final entry in _streamMetadata.entries) { - final streamId = entry.key; - final metadata = entry.value; - final lastUpdate = metadata['lastUpdate'] as DateTime?; - - if (lastUpdate != null) { - final timeSinceUpdate = now.difference(lastUpdate); - - // If no update in 90 seconds while in background, consider stale - if (timeSinceUpdate > const Duration(seconds: 90)) { - DebugLogger.warning( - 'Stream $streamId appears stale: ${timeSinceUpdate.inSeconds}s since last update', - ); - staleStreams.add(streamId); - } - } - } - - // Attempt recovery for stale streams - for (final streamId in staleStreams) { - final callback = _streamRecoveryCallbacks[streamId]; - if (callback != null && _retryAttempts[streamId] == null) { - DebugLogger.stream('Initiating recovery for stale stream: $streamId'); - _attemptStreamRecovery(streamId, callback); - } - } - } - - @override - void didChangeAppLifecycleState(AppLifecycleState state) { - // _lastLifecycleState = state; // Removed as it's unused - - switch (state) { - case AppLifecycleState.paused: - case AppLifecycleState.inactive: - _onAppBackground(); - break; - case AppLifecycleState.resumed: - _onAppForeground(); - break; - case AppLifecycleState.detached: - case AppLifecycleState.hidden: - // Handle app termination - _onAppDetached(); - break; - } - } - - void _onAppBackground() { - DebugLogger.stream('PersistentStreamingService: App went to background'); - _isInBackground = true; - - // Enable wake lock to prevent device sleep during streaming - if (_activeStreams.isNotEmpty) { - _enableWakeLock(); - _startBackgroundExecution(); - } - } - - void _onAppForeground() { - DebugLogger.stream( - 'PersistentStreamingService: App returned to foreground', - ); - _isInBackground = false; - - // Cancel background timer - _backgroundTimer?.cancel(); - _backgroundTimer = null; - - // Disable wake lock if no active streams - if (_activeStreams.isEmpty) { - _disableWakeLock(); - } - - // Close any controllers for streams that were suspended in background - // This allows the onComplete handlers to fire now that we're in foreground - final suspendedStreams = _streamMetadata.entries - .where((e) => e.value['suspended'] == true) - .map((e) => e.key) - .toList(); - - for (final streamId in suspendedStreams) { - final controller = _streamControllers[streamId]; - if (controller != null && !controller.isClosed) { - DebugLogger.stream( - 'PersistentStreamingService: Closing suspended stream $streamId controller in foreground', - ); - controller.close(); - } - } - - // Check and recover any interrupted streams - _recoverActiveStreams(); - } - - void _onAppDetached() { - DebugLogger.stream('PersistentStreamingService: App detached'); - - // Save stream states for recovery - _saveStreamStatesForRecovery(); - - // Clean up - _backgroundTimer?.cancel(); - _heartbeatTimer?.cancel(); - _disableWakeLock(); - } - - // Register a stream for persistent handling - String registerStream({ - required StreamSubscription subscription, - required StreamController controller, - Function? recoveryCallback, - Map? metadata, - }) { - final streamId = DateTime.now().millisecondsSinceEpoch.toString(); - - _activeStreams[streamId] = subscription; - _streamControllers[streamId] = controller; - if (recoveryCallback != null) { - _streamRecoveryCallbacks[streamId] = recoveryCallback; - } - - // Store metadata for recovery - if (metadata != null) { - _streamMetadata[streamId] = metadata; - - // Register with background handler - _backgroundHandler.registerStream( - streamId, - conversationId: metadata['conversationId'] ?? '', - messageId: metadata['messageId'] ?? '', - sessionId: metadata['sessionId'], - lastChunkSequence: metadata['lastChunkSequence'], - lastContent: metadata['lastContent'], - ); - } - - // Enable wake lock when streaming starts - if (_activeStreams.length == 1) { - _enableWakeLock(); - } - - // Start background execution if app is backgrounded - if (_isInBackground) { - _startBackgroundExecution(); - } - - DebugLogger.stream( - 'PersistentStreamingService: Registered stream $streamId', - ); - - return streamId; - } - - // Unregister a stream - void unregisterStream(String streamId, {bool saveForRecovery = false}) { - // If app is in background and stream is unregistering, it might be due to - // network interruption - save state for recovery instead of just dropping it - if (_isInBackground && - !saveForRecovery && - _streamMetadata.containsKey(streamId)) { - DebugLogger.stream( - 'PersistentStreamingService: Stream $streamId interrupted in background, saving for recovery', - ); - // Don't unregister yet - keep it for recovery - _markStreamAsSuspended(streamId); - return; - } - - _activeStreams.remove(streamId); - _streamControllers.remove(streamId); - _streamRecoveryCallbacks.remove(streamId); - _streamMetadata.remove(streamId); - _retryAttempts.remove(streamId); - - // Unregister from background handler - _backgroundHandler.unregisterStream(streamId); - - // Stop background execution if no more streams - if (_activeStreams.isEmpty) { - _backgroundHandler.stopBackgroundExecution([streamId]); - _disableWakeLock(); - } - - DebugLogger.stream( - 'PersistentStreamingService: Unregistered stream $streamId', - ); - } - - // Check if a stream is still active - bool isStreamActive(String streamId) { - return _activeStreams.containsKey(streamId); - } - - // Recover interrupted streams - Future _recoverActiveStreams() async { - if (!_hasConnectivity) { - DebugLogger.stream( - 'PersistentStreaming: No connectivity, skipping recovery', - ); - return; - } - - // First, try to recover from background handler saved states - final savedStates = await _backgroundHandler.recoverStreamStates(); - for (final state in savedStates) { - if (!state.isStale()) { - await _recoverStreamFromState(state); - } - } - - // Then check active streams for recovery - for (final entry in _streamRecoveryCallbacks.entries) { - final streamId = entry.key; - final recoveryCallback = entry.value; - - // Check if stream was interrupted or needs recovery - final subscription = _activeStreams[streamId]; - if (subscription == null || _needsRecovery(streamId)) { - await _attemptStreamRecovery(streamId, recoveryCallback); - } - } - } - - Future _recoverStreamFromState(StreamState state) async { - final recoveryCallback = _streamRecoveryCallbacks[state.streamId]; - if (recoveryCallback != null) { - DebugLogger.stream( - 'PersistentStreaming: Recovering stream from saved state: ${state.streamId}', - ); - await _attemptStreamRecovery(state.streamId, recoveryCallback); - } - } - - Future _attemptStreamRecovery( - String streamId, - Function recoveryCallback, - ) async { - final attempts = _retryAttempts[streamId] ?? 0; - if (attempts >= _maxRetryAttempts) { - DebugLogger.warning( - 'PersistentStreaming: Max retry attempts reached for stream $streamId', - ); - return; - } - - DebugLogger.stream( - 'PersistentStreaming: Recovering stream $streamId (attempt ${attempts + 1})', - ); - - try { - _retryAttempts[streamId] = attempts + 1; - - // Add exponential backoff delay - if (attempts > 0) { - final delay = _retryDelay * (1 << (attempts - 1)); // 2s, 4s, 8s... - await Future.delayed(delay); - } - - // Call recovery callback to restart the stream - await recoveryCallback(); - - // Reset retry count on success - _retryAttempts.remove(streamId); - } catch (e) { - DebugLogger.error( - 'recover-failed', - scope: 'streaming/persistent', - error: e, - data: {'streamId': streamId}, - ); - - // Schedule next retry if under limit - if (_retryAttempts[streamId]! < _maxRetryAttempts) { - Timer( - _retryDelay, - () => _attemptStreamRecovery(streamId, recoveryCallback), - ); - } - } - } - - bool _needsRecovery(String streamId) { - final metadata = _streamMetadata[streamId]; - if (metadata == null) return false; - - // Check if stream is marked as suspended - if (metadata['suspended'] == true) { - final suspendedAt = metadata['suspendedAt'] as DateTime?; - if (suspendedAt != null) { - final timeSinceSuspend = DateTime.now().difference(suspendedAt); - // Try to recover suspended streams after 10 seconds - return timeSinceSuspend > const Duration(seconds: 10); - } - } - - // Check if stream has been inactive for too long - final lastUpdate = metadata['lastUpdate'] as DateTime?; - if (lastUpdate != null) { - final timeSinceUpdate = DateTime.now().difference(lastUpdate); - // In background: 90 seconds - // In foreground: 2 minutes - final threshold = _isInBackground - ? const Duration(seconds: 90) - : const Duration(minutes: 2); - return timeSinceUpdate > threshold; - } - - return false; - } - - // Platform-specific background execution - void _startBackgroundExecution() { - if (_activeStreams.isNotEmpty) { - _backgroundHandler.startBackgroundExecution(_activeStreams.keys.toList()); - } - } - - void _markStreamAsSuspended(String streamId) { - final metadata = _streamMetadata[streamId]; - if (metadata != null) { - metadata['suspended'] = true; - metadata['suspendedAt'] = DateTime.now(); - } - } - - void _suspendAllStreams() { - for (final streamId in _activeStreams.keys) { - _markStreamAsSuspended(streamId); - } - } - - void _saveStreamStatesForRecovery() { - if (_activeStreams.isEmpty) { - DebugLogger.stream( - 'PersistentStreaming: No active streams to save for recovery', - ); - return; - } - - DebugLogger.stream( - 'PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery', - ); - - // Actually save the stream states through the background handler - final streamIds = _activeStreams.keys.toList(); - _backgroundHandler.saveStreamStatesForRecovery(streamIds, 'app_detached'); - } - - // Update stream metadata when chunks are received - void updateStreamProgress( - String streamId, { - int? chunkSequence, - String? content, - String? appendedContent, - }) { - // Update background handler state - _backgroundHandler.updateStreamState( - streamId, - chunkSequence: chunkSequence, - content: content, - appendedContent: appendedContent, - ); - - // Update local metadata - final metadata = _streamMetadata[streamId]; - if (metadata != null) { - metadata['lastUpdate'] = DateTime.now(); - metadata['lastChunkSequence'] = - chunkSequence ?? metadata['lastChunkSequence']; - if (appendedContent != null) { - metadata['lastContent'] = - (metadata['lastContent'] ?? '') + appendedContent; - } else if (content != null) { - metadata['lastContent'] = content; - } - metadata['suspended'] = false; // Mark as active - } - } - - // Wake lock management - void _enableWakeLock() async { - try { - await WakelockPlus.enable(); - DebugLogger.stream('wake-lock-enabled', scope: 'streaming/persistent'); - } catch (e) { - DebugLogger.error( - 'wake-lock-enable-failed', - scope: 'streaming/persistent', - error: e, - ); - } - } - - void _disableWakeLock() async { - try { - await WakelockPlus.disable(); - DebugLogger.stream('wake-lock-disabled', scope: 'streaming/persistent'); - } catch (e) { - DebugLogger.error( - 'wake-lock-disable-failed', - scope: 'streaming/persistent', - error: e, - ); - } - } - - // Get active stream count - int get activeStreamCount => _activeStreams.length; - - // Check if app is in background - bool get isInBackground => _isInBackground; - - // Get stream metadata - Map? getStreamMetadata(String streamId) { - return _streamMetadata[streamId]; - } - - // Check if stream is suspended - bool isStreamSuspended(String streamId) { - final metadata = _streamMetadata[streamId]; - return metadata?['suspended'] == true; - } - - // Force recovery of a specific stream - Future forceRecoverStream(String streamId) async { - final recoveryCallback = _streamRecoveryCallbacks[streamId]; - if (recoveryCallback != null) { - _retryAttempts.remove(streamId); // Reset retry count - await _attemptStreamRecovery(streamId, recoveryCallback); - } - } - - // Cleanup - void dispose() { - WidgetsBinding.instance.removeObserver(this); - _backgroundTimer?.cancel(); - _heartbeatTimer?.cancel(); - _connectivitySubscription?.cancel(); - _disableWakeLock(); - - // Stop all background execution - if (_activeStreams.isNotEmpty) { - _backgroundHandler.stopBackgroundExecution(_activeStreams.keys.toList()); - } - - // Cancel all active streams - for (final subscription in _activeStreams.values) { - subscription.cancel(); - } - _activeStreams.clear(); - - // Close all controllers - for (final controller in _streamControllers.values) { - if (!controller.isClosed) { - controller.close(); - } - } - _streamControllers.clear(); - - // Clear all metadata - _streamMetadata.clear(); - _streamRecoveryCallbacks.clear(); - _retryAttempts.clear(); - - // Clear background handler - _backgroundHandler.clearAll(); - } -} diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart index fe7d773..7e8bf26 100644 --- a/lib/core/services/socket_service.dart +++ b/lib/core/services/socket_service.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:flutter/widgets.dart'; import 'package:socket_io_client/socket_io_client.dart' as io; diff --git a/lib/core/services/sse_stream_parser.dart b/lib/core/services/sse_stream_parser.dart deleted file mode 100644 index fbcd2eb..0000000 --- a/lib/core/services/sse_stream_parser.dart +++ /dev/null @@ -1,202 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'package:dio/dio.dart'; -import '../utils/debug_logger.dart'; - -/// Parser for Server-Sent Events (SSE) streaming responses. -/// -/// This matches the web client's EventSourceParserStream behavior, -/// parsing SSE data chunks and extracting OpenAI-compatible deltas. -class SSEStreamParser { - /// Parse an SSE response stream from Dio into text chunks. - /// - /// Returns a stream of content strings extracted from OpenAI-style - /// completion chunks. - /// - /// [heartbeatTimeout] - Maximum time without data before considering - /// the connection stale (default: 2 minutes) - /// [onHeartbeat] - Callback invoked when any data is received - static Stream parseResponseStream( - ResponseBody responseBody, { - bool splitLargeDeltas = false, - Duration heartbeatTimeout = const Duration(minutes: 2), - void Function()? onHeartbeat, - }) async* { - DateTime lastDataReceived = DateTime.now(); - Timer? heartbeatTimer; - - // Set up heartbeat monitoring - if (heartbeatTimeout.inMilliseconds > 0) { - heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) { - final timeSinceLastData = DateTime.now().difference(lastDataReceived); - if (timeSinceLastData > heartbeatTimeout) { - DebugLogger.warning( - 'SSE stream heartbeat timeout: No data received for ${timeSinceLastData.inSeconds}s', - data: {'timeout': heartbeatTimeout.inSeconds}, - ); - timer.cancel(); - } - }); - } - - try { - // Buffer for accumulating incomplete SSE messages - String buffer = ''; - - await for (final chunk in responseBody.stream) { - // Update last data timestamp and invoke heartbeat callback - lastDataReceived = DateTime.now(); - onHeartbeat?.call(); - - // Convert bytes to string (Dio ResponseBody.stream always emits Uint8List) - final text = utf8.decode(chunk as List, allowMalformed: true); - buffer += text; - - // Process complete SSE messages (delimited by double newline) - final messages = buffer.split('\n\n'); - - // Keep the last (potentially incomplete) message in the buffer - buffer = messages.removeLast(); - - for (final message in messages) { - if (message.trim().isEmpty) continue; - - // Parse SSE message - final content = _parseSSEMessage(message); - if (content != null) { - if (content == '[DONE]') { - // Stream completion signal - DebugLogger.stream('SSE stream completed with [DONE] signal'); - return; - } - - // Split large deltas into smaller chunks for smoother UI updates - if (splitLargeDeltas && content.length > 5) { - yield* _splitIntoChunks(content); - } else { - yield content; - } - } - } - } - - // Process any remaining buffered data - if (buffer.trim().isNotEmpty) { - final content = _parseSSEMessage(buffer); - if (content != null && content != '[DONE]') { - yield content; - } - } - } catch (e, stackTrace) { - DebugLogger.error( - 'sse-parse-error', - scope: 'streaming/sse', - error: e, - stackTrace: stackTrace, - ); - rethrow; - } finally { - // Clean up heartbeat timer - heartbeatTimer?.cancel(); - } - } - - /// Parse a single SSE message and extract content. - static String? _parseSSEMessage(String message) { - try { - // SSE format: "data: \n" or just the JSON - String dataLine = message.trim(); - - // Remove "data: " prefix if present - if (dataLine.startsWith('data: ')) { - dataLine = dataLine.substring(6).trim(); - } else if (dataLine.startsWith('data:')) { - dataLine = dataLine.substring(5).trim(); - } - - // Handle [DONE] signal - if (dataLine == '[DONE]' || dataLine == 'DONE') { - return '[DONE]'; - } - - // Skip empty data - if (dataLine.isEmpty) { - return null; - } - - // Parse JSON - try { - final json = jsonDecode(dataLine) as Map; - - // Handle errors - if (json['error'] != null) { - DebugLogger.error( - 'sse-error-response', - scope: 'streaming/sse', - error: json['error'], - ); - return null; - } - - // Extract content from OpenAI-style response - // Format: { choices: [{ delta: { content: "..." } }] } - final choices = json['choices']; - if (choices is List && choices.isNotEmpty) { - final choice = choices.first as Map?; - if (choice != null) { - final delta = choice['delta'] as Map?; - if (delta != null) { - final content = delta['content']; - if (content is String && content.isNotEmpty) { - return content; - } - } - } - } - - // Alternative format: { content: "..." } - final directContent = json['content']; - if (directContent is String && directContent.isNotEmpty) { - return directContent; - } - - return null; - } on FormatException catch (e) { - DebugLogger.warning( - 'Failed to parse SSE JSON: $dataLine', - data: {'error': e.toString()}, - ); - return null; - } - } catch (e) { - DebugLogger.error( - 'sse-message-parse-error', - scope: 'streaming/sse', - error: e, - data: {'message': message}, - ); - return null; - } - } - - /// Split large content into smaller chunks for smoother streaming. - /// This matches the web client's streamLargeDeltasAsRandomChunks behavior. - static Stream _splitIntoChunks(String content) async* { - var remaining = content; - - while (remaining.isNotEmpty) { - // Random chunk size between 1-3 characters - final chunkSize = (remaining.length < 3) - ? remaining.length - : 1 + (DateTime.now().millisecond % 3); - - final chunk = remaining.substring(0, chunkSize); - yield chunk; - - // Small delay for smoother visual effect (matching web client) - await Future.delayed(const Duration(milliseconds: 5)); - - remaining = remaining.substring(chunkSize); - } - } -} diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index c59af2c..d450d0d 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -5,7 +5,6 @@ import 'package:flutter/material.dart'; import '../../core/models/chat_message.dart'; import '../../core/models/socket_event.dart'; -import '../../core/services/persistent_streaming_service.dart'; import '../../core/services/socket_service.dart'; import '../../core/utils/inactivity_watchdog.dart'; import '../../core/utils/tool_calls_parser.dart'; @@ -163,86 +162,44 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ required void Function() finishStreaming, required List Function() getMessages, }) { - // Persistable controller to survive brief app suspensions + // Track if streaming has been finished to avoid duplicate cleanup + bool hasFinished = false; + + // Wrap finishStreaming to always clear the cancel token + void wrappedFinishStreaming() { + if (hasFinished) return; + hasFinished = true; + api.clearStreamCancelToken(assistantMessageId); + finishStreaming(); + } + + // Controller for forwarding data to StreamingResponseController + // With WebSocket-only streaming, the HTTP stream closes immediately after returning task_id. + // All actual content comes via WebSocket events, so we don't need persistent stream tracking. final persistentController = StreamController.broadcast(); - final persistentService = PersistentStreamingService(); - // Track if stream has received any data - bool hasReceivedData = false; - - // Create subscription first so we can reference it in onDone - late final String streamId; - final subscription = stream.listen( + // Subscribe to HTTP stream (mainly for error handling - content comes via WebSocket) + final httpSubscription = stream.listen( (data) { - hasReceivedData = true; + // Forward any HTTP stream data (rare with WebSocket-only) persistentController.add(data); }, - onDone: () async { + onDone: () { DebugLogger.stream( - 'Source stream onDone fired, hasReceivedData=$hasReceivedData', + 'HTTP stream completed - WebSocket handles content delivery', ); - - // If stream closes immediately without data, it's likely due to backgrounding/network drop - // Not a natural completion - if (!hasReceivedData) { - DebugLogger.stream( - 'Stream closed without data - likely interrupted, not completing', - ); - // Check if app is backgrounding - if so, finish streaming with whatever we have - await Future.delayed(const Duration(milliseconds: 300)); - if (persistentService.isInBackground) { - DebugLogger.stream( - 'App backgrounding during stream - finishing with current content', - ); - finishStreaming(); - } - // Don't close the controller to prevent cascading completion handlers - return; - } - - // For streams with data, delay to allow background detection - await Future.delayed(const Duration(milliseconds: 500)); - - final isInBg = persistentService.isInBackground; - DebugLogger.stream( - 'Stream onDone check: streamId=$streamId, isInBackground=$isInBg', - ); - - // Check if we're in background before closing - if (!isInBg) { - DebugLogger.stream( - 'Closing stream controller for $streamId (foreground completion)', - ); + // Close the controller to trigger StreamingResponseController.onComplete + // WebSocket events continue independently via socket subscriptions + if (!persistentController.isClosed) { persistentController.close(); - } else { - DebugLogger.stream( - 'Source stream completed in background for $streamId - keeping open for recovery', - ); - // Finish streaming to save the content we have - finishStreaming(); } }, onError: persistentController.addError, ); - streamId = persistentService.registerStream( - subscription: subscription, - controller: persistentController, - recoveryCallback: () async { - DebugLogger.log( - 'Attempting to recover interrupted stream', - scope: 'streaming/helper', - ); - }, - metadata: { - 'conversationId': activeConversationId, - 'messageId': assistantMessageId, - 'modelId': modelId, - }, - ); - api.registerPersistentStreamForMessage(assistantMessageId, streamId); - InactivityWatchdog? socketWatchdog; + // Socket subscriptions list - starts empty so non-socket flows can finish via onComplete. + // HTTP subscription is tracked separately and cleaned up in disposeSocketSubscriptions. final socketSubscriptions = []; final hasSocketSignals = socketService != null || registerDeltaListener != null; @@ -268,7 +225,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ if (msgs.isNotEmpty && msgs.last.role == 'assistant' && msgs.last.isStreaming) { - finishStreaming(); + wrappedFinishStreaming(); } } catch (_) {} socketWatchdog?.stop(); @@ -284,15 +241,19 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ int imageCollectionRequestId = 0; void disposeSocketSubscriptions() { - if (socketSubscriptions.isEmpty) { - return; - } + // Cancel HTTP subscription + try { + httpSubscription.cancel(); + } catch (_) {} + + // Cancel socket subscriptions for (final dispose in socketSubscriptions) { try { dispose(); } catch (_) {} } socketSubscriptions.clear(); + imageCollectionDebounce?.cancel(); imageCollectionDebounce = null; pendingImageContent = null; @@ -481,7 +442,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ sessionId: sessionId, ); } catch (_) {} - finishStreaming(); + wrappedFinishStreaming(); socketWatchdog?.stop(); return; } @@ -502,7 +463,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ sessionId: sessionId, ); } catch (_) {} - finishStreaming(); + wrappedFinishStreaming(); socketWatchdog?.stop(); return; } @@ -563,7 +524,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { socketService?.offEvent(channel); } catch (_) {} - finishStreaming(); + wrappedFinishStreaming(); socketWatchdog?.stop(); return; } @@ -781,13 +742,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } catch (_) { } finally { - finishStreaming(); + wrappedFinishStreaming(); } }); return; } } - finishStreaming(); + wrappedFinishStreaming(); socketWatchdog?.stop(); } } @@ -816,7 +777,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ }); } disposeSocketSubscriptions(); - finishStreaming(); + wrappedFinishStreaming(); } else if (type == 'chat:message:follow_ups' && payload != null) { DebugLogger.log('Received follow-ups event', scope: 'streaming/helper'); final followMap = _asStringMap(payload); @@ -966,7 +927,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ return message.copyWith(statusHistory: filtered); }); // Ensure UI exits streaming state - finishStreaming(); + wrappedFinishStreaming(); socketWatchdog?.stop(); } else if ((type == 'chat:message:delta' || type == 'message') && payload != null) { @@ -1246,17 +1207,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } }, onComplete: () { - api.clearPersistentStreamForMessage( - assistantMessageId, - expectedStreamId: streamId, - ); - // Unregister from persistent service - persistentService.unregisterStream(streamId); - - // Stream completion without socket subscriptions indicates a simple flow - // For WebSocket flows, completion should be handled by socket events (done: true) + // HTTP stream completed - cleanup already done in onDone handler. + // For WebSocket flows, actual completion is handled by socket events (done: true). + // Only finish streaming here if there are no socket subscriptions (simple/legacy flow). if (socketSubscriptions.isEmpty) { - finishStreaming(); + wrappedFinishStreaming(); Future.microtask(refreshConversationSnapshot); } }, @@ -1272,14 +1227,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ }, ); - api.clearPersistentStreamForMessage( - assistantMessageId, - expectedStreamId: streamId, - ); - try { - persistentService.unregisterStream(streamId); - } catch (_) {} - // Check if this is a recoverable error (network issues, etc.) final errorText = error.toString(); final isRecoverable = @@ -1303,7 +1250,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } disposeSocketSubscriptions(); - finishStreaming(); + wrappedFinishStreaming(); Future.microtask(refreshConversationSnapshot); socketWatchdog?.stop(); }, diff --git a/lib/core/services/streaming_response_controller.dart b/lib/core/services/streaming_response_controller.dart index d007ee9..13015fe 100644 --- a/lib/core/services/streaming_response_controller.dart +++ b/lib/core/services/streaming_response_controller.dart @@ -17,7 +17,7 @@ typedef StreamingErrorCallback = /// /// This wraps a [StreamSubscription], normalises error handling, and exposes /// a unified cancel method so UI layers can stop streaming without having to -/// know the underlying transport (SSE, polling, etc.). +/// know the underlying transport (WebSocket, polling, etc.). class StreamingResponseController { StreamingResponseController({ required Stream stream, diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 44b40bc..320d7d5 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -34,6 +34,15 @@ final chatMessagesProvider = ChatMessagesNotifier.new, ); +/// Whether chat is currently streaming a response. +/// Used by router to avoid showing connection issues during active streaming. +final isChatStreamingProvider = Provider((ref) { + final messages = ref.watch(chatMessagesProvider); + if (messages.isEmpty) return false; + final last = messages.last; + return last.role == 'assistant' && last.isStreaming; +}); + // Loading state for conversation (used to show chat skeletons during fetch) @Riverpod(keepAlive: true) class IsLoadingConversation extends _$IsLoadingConversation { @@ -1381,24 +1390,45 @@ Future regenerateMessage( 'tags': [], }; - // Socket binding for background flows + // WebSocket-only streaming requires socket connection final socketService = ref.read(socketServiceProvider); - String? socketSessionId = socketService?.sessionId; - bool wantSessionBinding = - (socketService?.isConnected == true) && - (socketSessionId != null && socketSessionId.isNotEmpty); - // When regenerating with tools, make a best-effort to ensure a live socket. - if (!wantSessionBinding && socketService != null) { - try { - final ok = await socketService.ensureConnected(); - if (ok) { - socketSessionId = socketService.sessionId; - wantSessionBinding = - socketSessionId != null && socketSessionId.isNotEmpty; - } - } catch (_) {} + if (socketService == null) { + // No socket service available + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( + m, + ) { + return m.copyWith( + content: 'Connection not available. Please try again later.', + isStreaming: false, + ); + }); + return; } + // Ensure socket is connected (with 10s timeout) + if (!socketService.isConnected) { + final connected = await socketService.ensureConnected( + timeout: const Duration(seconds: 10), + ); + if (!connected) { + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( + m, + ) { + return m.copyWith( + content: + 'Unable to connect to server. Please check your connection and try again.', + isStreaming: false, + ); + }); + return; + } + } + + final socketSessionId = socketService.sessionId; + final bool wantSessionBinding = + socketService.isConnected && + (socketSessionId != null && socketSessionId.isNotEmpty); + // Resolve tool servers from user settings (if any) List>? toolServers; final uiSettings = userSettingsData?['ui'] as Map?; @@ -1963,12 +1993,46 @@ Future _sendMessageInternal( 'tags': [], }; - // Stream response using server-push via Socket when available, otherwise fallback - // Resolve Socket session for background tasks parity + // WebSocket-only streaming requires socket connection. + // Wait for connection with timeout before proceeding. final socketService = ref.read(socketServiceProvider); - final socketSessionId = socketService?.sessionId; + if (socketService == null) { + // No socket service available at all + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( + m, + ) { + return m.copyWith( + content: 'Connection not available. Please try again later.', + isStreaming: false, + ); + }); + return; + } + + // Ensure socket is connected (with 10s timeout for initial connection) + if (!socketService.isConnected) { + final connected = await socketService.ensureConnected( + timeout: const Duration(seconds: 10), + ); + if (!connected) { + // Socket connection failed - cannot stream without it + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( + m, + ) { + return m.copyWith( + content: + 'Unable to connect to server. Please check your connection and try again.', + isStreaming: false, + ); + }); + return; + } + } + + // Socket is now connected - resolve session for background tasks parity + final socketSessionId = socketService.sessionId; final bool wantSessionBinding = - (socketService?.isConnected == true) && + socketService.isConnected && (socketSessionId != null && socketSessionId.isNotEmpty); // Resolve tool servers from user settings (if any) @@ -2045,7 +2109,7 @@ Future _sendMessageInternal( final effectiveSessionId = response.socketSessionId ?? socketSessionId ?? sessionId; - // Use unified streaming helper for SSE/WebSocket handling + // Use unified streaming helper for WebSocket handling final bool isBackgroundFlow = response.isBackgroundFlow; try { @@ -2500,7 +2564,7 @@ final stopGenerationProvider = Provider((ref) { messages.last.isStreaming) { final lastId = messages.last.id; - // Cancel the network stream (SSE) if active + // Cancel the network stream if active final api = ref.read(apiServiceProvider); api?.cancelStreamingMessage(lastId); diff --git a/lib/features/chat/views/chat_page.dart b/lib/features/chat/views/chat_page.dart index 9bfcbf4..a888cb8 100644 --- a/lib/features/chat/views/chat_page.dart +++ b/lib/features/chat/views/chat_page.dart @@ -1254,7 +1254,7 @@ class _ChatPageState extends ConsumerState { try { // If assistant message has generated images and it's the last message, - // use image-only regenerate flow instead of text SSE regeneration + // use image-only regenerate flow instead of text streaming regeneration if (message.role == 'assistant' && (message.files?.any((f) => f['type'] == 'image') == true) && messageIndex == messages.length - 1) {