feat(streaming): Simplify streaming logic and remove persistent tracking

This commit is contained in:
cogwheel0
2025-11-27 14:36:13 +05:30
parent e6f8a76f13
commit 61a3fcc83a
11 changed files with 181 additions and 1066 deletions

View File

@@ -13,7 +13,6 @@ import '../services/app_intents_service.dart';
import '../services/quick_actions_service.dart';
import '../models/conversation.dart';
import '../services/background_streaming_handler.dart';
import '../services/persistent_streaming_service.dart';
import '../services/socket_service.dart';
import '../../features/onboarding/views/onboarding_sheet.dart';
import '../../features/chat/providers/chat_providers.dart';
@@ -198,15 +197,6 @@ class AppStartupFlow extends _$AppStartupFlow {
keepAlive(socketPersistenceProvider);
});
// Ensure persistent streaming uses the shared connectivity service
final connectivityService = ref.read(connectivityServiceProvider);
Future<void>.delayed(const Duration(milliseconds: 160), () {
if (!ref.mounted) return;
PersistentStreamingService().attachConnectivityService(
connectivityService,
);
});
// Warm the conversations list in the background as soon as possible,
// but avoid doing so on poor connectivity to reduce startup load.
// Apply a small randomized delay to smooth load spikes across app wakes.
@@ -432,7 +422,7 @@ class _ForegroundRefreshObserver extends WidgetsBindingObserver {
}
/// Attempts to keep the realtime socket connection alive while the app is
/// backgrounded, similar to how PersistentStreamingService works for streams.
/// backgrounded using BackgroundStreamingHandler for platform-specific handling.
///
/// Notes:
/// - iOS: limited to short background task windows; we send periodic keepAlive.

View File

@@ -7,9 +7,9 @@ import '../auth/auth_state_manager.dart';
import '../providers/app_providers.dart';
import '../services/connectivity_service.dart';
import '../services/navigation_service.dart';
import '../services/persistent_streaming_service.dart';
import '../utils/debug_logger.dart';
import '../../features/auth/providers/unified_auth_providers.dart';
import '../../features/chat/providers/chat_providers.dart';
import '../../features/auth/views/authentication_page.dart';
import '../../features/auth/views/connect_signin_page.dart';
import '../../features/auth/views/connection_issue_page.dart';
@@ -37,6 +37,7 @@ class RouterNotifier extends ChangeNotifier {
connectivityStatusProvider,
_onStateChanged,
),
ref.listen<bool>(isChatStreamingProvider, _onStateChanged),
];
}
@@ -112,8 +113,8 @@ class RouterNotifier extends ChangeNotifier {
// 2. Connectivity is explicitly offline
// 3. Auth is authenticated (don't interrupt auth flow)
// 4. App is in foreground and offline warning isn't suppressed
// 5. No active streaming is in progress (avoid interrupting token streams)
final hasActiveStreams = PersistentStreamingService().activeStreamCount > 0;
// 5. No active streaming is in progress (avoid interrupting chat streams)
final hasActiveStreams = ref.read(isChatStreamingProvider);
final shouldShowConnectionIssue =
!reviewerMode &&
connectivity == ConnectivityStatus.offline &&

View File

@@ -19,9 +19,7 @@ import '../models/prompt.dart';
import '../auth/api_auth_interceptor.dart';
import '../error/api_error_interceptor.dart';
// Tool-call details are parsed in the UI layer to render collapsible blocks
import 'persistent_streaming_service.dart';
import 'connectivity_service.dart';
import 'sse_stream_parser.dart';
import '../utils/debug_logger.dart';
import 'conversation_parsing.dart';
import 'worker_manager.dart';
@@ -2596,36 +2594,12 @@ class ApiService {
// Chat streaming with conversation context
// Track cancellable streaming requests by messageId for stop parity
final Map<String, CancelToken> _streamCancelTokens = {};
final Map<String, String> _messagePersistentStreamIds = {};
/// Associates a streaming message with its persistent stream identifier.
void registerPersistentStreamForMessage(String messageId, String streamId) {
_messagePersistentStreamIds[messageId] = streamId;
}
/// Removes the persistent stream mapping for a message if it matches.
///
/// Returns the removed persistent stream identifier when one existed and
/// matched the optional [expectedStreamId].
String? clearPersistentStreamForMessage(
String messageId, {
String? expectedStreamId,
}) {
final current = _messagePersistentStreamIds[messageId];
if (current == null) {
return null;
}
if (expectedStreamId != null && current != expectedStreamId) {
return null;
}
return _messagePersistentStreamIds.remove(messageId);
}
// Send message using dual-stream approach (HTTP SSE + WebSocket events).
// Matches OpenWebUI web client behavior:
// - HTTP SSE stream provides immediate content chunks
// - WebSocket events deliver metadata, tool status, sources, follow-ups
// - Both streams run in parallel for reliability
// Send message using WebSocket-only streaming.
// Matches OpenWebUI web client behavior when session_id + chat_id + message_id are provided:
// - HTTP POST returns JSON with task_id (no SSE streaming)
// - All content and metadata delivered via WebSocket events
// - Events: chat:completion, chat:message:delta, status, source, follow_ups, etc.
// Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow)
({
Stream<String> stream,
@@ -2790,7 +2764,7 @@ class ApiService {
_traceApi('Including non-image files in request: ${allFiles.length}');
}
_traceApi('Preparing dual-stream chat request (HTTP SSE + WebSocket)');
_traceApi('Preparing WebSocket-only chat request');
_traceApi('Model: $model');
_traceApi('Message count: ${processedMessages.length}');
@@ -2830,118 +2804,85 @@ class ApiService {
);
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
_traceApi('Initiating dual-stream request (HTTP SSE + WebSocket)');
_traceApi('Initiating WebSocket-only chat request');
_traceApi('Posting to /api/chat/completions');
// Create a cancel token for this request
final cancelToken = CancelToken();
_streamCancelTokens[messageId] = cancelToken;
// Start HTTP SSE stream (matches web client behavior)
// The WebSocket events will run in parallel via streaming_helper.dart
// Send HTTP request to initiate chat task
// With session_id + chat_id + message_id, the server returns a task_id
// and all streaming happens via WebSocket events (not SSE)
() async {
try {
final resp = await _dio.post(
'/api/chat/completions',
data: data,
options: Options(
responseType: ResponseType.stream,
// Extended timeout for streaming responses - allow up to 10 minutes
// for long-running tool calls and reasoning
receiveTimeout: const Duration(minutes: 10),
// Shorter send timeout for the initial request
responseType: ResponseType.json,
receiveTimeout: const Duration(seconds: 30),
sendTimeout: const Duration(seconds: 30),
headers: {
'Accept': 'text/event-stream',
// Enable HTTP keep-alive to maintain connection in background
'Connection': 'keep-alive',
// Request server to send keep-alive messages
'Cache-Control': 'no-cache',
},
),
cancelToken: cancelToken,
);
final respData = resp.data;
// Check if we got a task_id response (non-streaming)
if (respData is Map && respData['task_id'] != null) {
final taskId = respData['task_id'].toString();
_traceApi('Background task created: $taskId');
// In this case, all streaming will happen via WebSocket
// Close HTTP stream but keep WebSocket active
if (!streamController.isClosed) {
streamController.close();
}
return;
}
// We have a streaming response body
if (respData is ResponseBody) {
_traceApi('HTTP SSE stream started for message: $messageId');
// Parse SSE stream and forward chunks to controller
await for (final chunk in SSEStreamParser.parseResponseStream(
respData,
splitLargeDeltas: false,
heartbeatTimeout: const Duration(minutes: 2),
onHeartbeat: () {
// Notify persistent streaming service that connection is alive
final persistentStreamId = _messagePersistentStreamIds[messageId];
if (persistentStreamId != null) {
PersistentStreamingService().updateStreamProgress(
persistentStreamId,
chunkSequence: DateTime.now().millisecondsSinceEpoch,
);
}
},
)) {
if (respData is Map) {
if (respData['task_id'] != null) {
final taskId = respData['task_id'].toString();
_traceApi('Background task created: $taskId');
} else if (respData['status'] == true) {
_traceApi('Chat task initiated successfully');
} else if (respData['error'] != null) {
_traceApi('Server error: ${respData['error']}');
if (!streamController.isClosed) {
streamController.add(chunk);
} else {
_traceApi('Stream controller closed, stopping SSE parsing');
break;
streamController.addError(Exception(respData['error'].toString()));
}
}
_traceApi('HTTP SSE stream completed for message: $messageId');
} else {
_traceApi('Unexpected response type: ${respData.runtimeType}');
}
// Close the HTTP stream controller
// WebSocket events will continue independently via streaming_helper
// Close HTTP stream controller - WebSocket handles all content delivery
if (!streamController.isClosed) {
streamController.close();
}
} on DioException catch (e) {
if (CancelToken.isCancel(e)) {
_traceApi('HTTP stream cancelled for message: $messageId');
_traceApi('Request cancelled for message: $messageId');
} else {
_traceApi('HTTP stream error: $e');
_traceApi('Request error: $e');
if (!streamController.isClosed) {
streamController.addError(e);
streamController.close();
}
}
} catch (e) {
_traceApi('Unexpected error in HTTP stream: $e');
_traceApi('Unexpected error: $e');
if (!streamController.isClosed) {
streamController.addError(e);
streamController.close();
}
} finally {
_streamCancelTokens.remove(messageId);
}
// Note: Don't remove cancel token here - it should remain until WebSocket
// streaming finishes so Stop button can cancel the active generation.
// Token is removed by clearStreamCancelToken() when streaming completes.
}();
// Determine if this is actually a background flow based on the request payload
final bool isBackgroundFlow =
hasBackgroundTasksPayload ||
(toolIds != null && toolIds.isNotEmpty) ||
(toolServers != null && toolServers.isNotEmpty) ||
enableWebSearch ||
enableImageGeneration;
return (
stream: streamController.stream,
messageId: messageId,
sessionId: sessionId,
socketSessionId: socketSessionId,
isBackgroundFlow: true,
isBackgroundFlow: isBackgroundFlow,
);
}
@@ -2975,13 +2916,12 @@ class ApiService {
token.cancel('User cancelled');
}
} catch (_) {}
}
try {
final pid = clearPersistentStreamForMessage(messageId);
if (pid != null) {
PersistentStreamingService().unregisterStream(pid);
}
} catch (_) {}
/// Clears the cancel token for a message when streaming completes normally.
/// Called by streaming_helper when finishStreaming is invoked.
void clearStreamCancelToken(String messageId) {
_streamCancelTokens.remove(messageId);
}
// File upload for RAG

View File

@@ -1,627 +0,0 @@
import 'dart:async';
import 'package:flutter/material.dart';
import 'package:wakelock_plus/wakelock_plus.dart';
import 'background_streaming_handler.dart';
import 'connectivity_service.dart';
import '../utils/debug_logger.dart';
class PersistentStreamingService with WidgetsBindingObserver {
static final PersistentStreamingService _instance =
PersistentStreamingService._internal();
factory PersistentStreamingService() => _instance;
PersistentStreamingService._internal() {
_initialize();
}
// Active streams registry
final Map<String, StreamSubscription> _activeStreams = {};
final Map<String, StreamController> _streamControllers = {};
final Map<String, Function> _streamRecoveryCallbacks = {};
final Map<String, Map<String, dynamic>> _streamMetadata = {};
// App lifecycle state
// AppLifecycleState? _lastLifecycleState; // Removed as it's unused
bool _isInBackground = false;
Timer? _backgroundTimer;
Timer? _heartbeatTimer;
// Background streaming handler
late final BackgroundStreamingHandler _backgroundHandler;
// Connectivity monitoring
StreamSubscription<bool>? _connectivitySubscription;
ConnectivityService? _connectivityService;
bool _hasConnectivity = true;
// Recovery state
final Map<String, int> _retryAttempts = {};
static const int _maxRetryAttempts = 3;
static const Duration _retryDelay = Duration(seconds: 2);
void _initialize() {
WidgetsBinding.instance.addObserver(this);
_backgroundHandler = BackgroundStreamingHandler.instance;
_setupBackgroundHandlerCallbacks();
_startHeartbeat();
}
void _setupBackgroundHandlerCallbacks() {
_backgroundHandler.onServiceFailed = (error, errorType, streamIds) {
DebugLogger.error(
'background-service-failed',
scope: 'streaming/persistent',
error: '$errorType: $error',
data: {'affectedStreams': streamIds},
);
// Attempt immediate recovery for failed streams
for (final streamId in streamIds) {
final callback = _streamRecoveryCallbacks[streamId];
if (callback != null) {
// Schedule recovery after a short delay
Future.delayed(const Duration(seconds: 2), () {
if (_activeStreams.containsKey(streamId)) {
_attemptStreamRecovery(streamId, callback);
}
});
}
}
};
_backgroundHandler.onStreamsSuspending = (streamIds) {
DebugLogger.stream(
'PersistentStreaming: Streams suspending - $streamIds',
);
// Mark streams as suspended but don't close them yet
for (final streamId in streamIds) {
_markStreamAsSuspended(streamId);
}
};
_backgroundHandler.onBackgroundTaskExpiring = () {
DebugLogger.stream('PersistentStreaming: Background task expiring');
// Save states and prepare for recovery
_saveStreamStatesForRecovery();
};
_backgroundHandler
.onBackgroundTaskExtended = (streamIds, estimatedSeconds) {
DebugLogger.stream(
'PersistentStreaming: Background task extended for $estimatedSeconds seconds',
);
// BGTaskScheduler has given us more time - streams can continue
for (final streamId in streamIds) {
final metadata = _streamMetadata[streamId];
if (metadata != null) {
metadata['bgTaskExtended'] = true;
metadata['bgTaskExtendedAt'] = DateTime.now();
metadata['bgTaskEstimatedTime'] = estimatedSeconds;
}
}
};
_backgroundHandler.onBackgroundKeepAlive = () {
DebugLogger.stream('PersistentStreaming: Background keep-alive signal');
// BGTaskScheduler is keeping us alive - we can continue streaming
_heartbeatTimer?.cancel();
_startHeartbeat(); // Restart heartbeat timer
};
_backgroundHandler.shouldContinueInBackground = () {
return _activeStreams.isNotEmpty;
};
}
void attachConnectivityService(ConnectivityService service) {
if (identical(_connectivityService, service)) {
return;
}
_connectivitySubscription?.cancel();
_connectivityService = service;
_connectivitySubscription = service.statusStream
.map((status) => status == ConnectivityStatus.online)
.listen(_handleConnectivityChange);
}
void _handleConnectivityChange(bool connected) {
final wasConnected = _hasConnectivity;
_hasConnectivity = connected;
if (!wasConnected && connected) {
DebugLogger.stream(
'PersistentStreaming: Connectivity restored, recovering streams',
);
_recoverActiveStreams();
} else if (wasConnected && !connected) {
DebugLogger.stream(
'PersistentStreaming: Connectivity lost, suspending streams',
);
_suspendAllStreams();
}
}
void _startHeartbeat() {
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
if (_activeStreams.isNotEmpty && _isInBackground) {
_backgroundHandler.keepAlive();
// Check for stale streams during background operation
_checkStreamHealth();
}
});
}
void _checkStreamHealth() {
final now = DateTime.now();
final staleStreams = <String>[];
for (final entry in _streamMetadata.entries) {
final streamId = entry.key;
final metadata = entry.value;
final lastUpdate = metadata['lastUpdate'] as DateTime?;
if (lastUpdate != null) {
final timeSinceUpdate = now.difference(lastUpdate);
// If no update in 90 seconds while in background, consider stale
if (timeSinceUpdate > const Duration(seconds: 90)) {
DebugLogger.warning(
'Stream $streamId appears stale: ${timeSinceUpdate.inSeconds}s since last update',
);
staleStreams.add(streamId);
}
}
}
// Attempt recovery for stale streams
for (final streamId in staleStreams) {
final callback = _streamRecoveryCallbacks[streamId];
if (callback != null && _retryAttempts[streamId] == null) {
DebugLogger.stream('Initiating recovery for stale stream: $streamId');
_attemptStreamRecovery(streamId, callback);
}
}
}
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
// _lastLifecycleState = state; // Removed as it's unused
switch (state) {
case AppLifecycleState.paused:
case AppLifecycleState.inactive:
_onAppBackground();
break;
case AppLifecycleState.resumed:
_onAppForeground();
break;
case AppLifecycleState.detached:
case AppLifecycleState.hidden:
// Handle app termination
_onAppDetached();
break;
}
}
void _onAppBackground() {
DebugLogger.stream('PersistentStreamingService: App went to background');
_isInBackground = true;
// Enable wake lock to prevent device sleep during streaming
if (_activeStreams.isNotEmpty) {
_enableWakeLock();
_startBackgroundExecution();
}
}
void _onAppForeground() {
DebugLogger.stream(
'PersistentStreamingService: App returned to foreground',
);
_isInBackground = false;
// Cancel background timer
_backgroundTimer?.cancel();
_backgroundTimer = null;
// Disable wake lock if no active streams
if (_activeStreams.isEmpty) {
_disableWakeLock();
}
// Close any controllers for streams that were suspended in background
// This allows the onComplete handlers to fire now that we're in foreground
final suspendedStreams = _streamMetadata.entries
.where((e) => e.value['suspended'] == true)
.map((e) => e.key)
.toList();
for (final streamId in suspendedStreams) {
final controller = _streamControllers[streamId];
if (controller != null && !controller.isClosed) {
DebugLogger.stream(
'PersistentStreamingService: Closing suspended stream $streamId controller in foreground',
);
controller.close();
}
}
// Check and recover any interrupted streams
_recoverActiveStreams();
}
void _onAppDetached() {
DebugLogger.stream('PersistentStreamingService: App detached');
// Save stream states for recovery
_saveStreamStatesForRecovery();
// Clean up
_backgroundTimer?.cancel();
_heartbeatTimer?.cancel();
_disableWakeLock();
}
// Register a stream for persistent handling
String registerStream({
required StreamSubscription subscription,
required StreamController controller,
Function? recoveryCallback,
Map<String, dynamic>? metadata,
}) {
final streamId = DateTime.now().millisecondsSinceEpoch.toString();
_activeStreams[streamId] = subscription;
_streamControllers[streamId] = controller;
if (recoveryCallback != null) {
_streamRecoveryCallbacks[streamId] = recoveryCallback;
}
// Store metadata for recovery
if (metadata != null) {
_streamMetadata[streamId] = metadata;
// Register with background handler
_backgroundHandler.registerStream(
streamId,
conversationId: metadata['conversationId'] ?? '',
messageId: metadata['messageId'] ?? '',
sessionId: metadata['sessionId'],
lastChunkSequence: metadata['lastChunkSequence'],
lastContent: metadata['lastContent'],
);
}
// Enable wake lock when streaming starts
if (_activeStreams.length == 1) {
_enableWakeLock();
}
// Start background execution if app is backgrounded
if (_isInBackground) {
_startBackgroundExecution();
}
DebugLogger.stream(
'PersistentStreamingService: Registered stream $streamId',
);
return streamId;
}
// Unregister a stream
void unregisterStream(String streamId, {bool saveForRecovery = false}) {
// If app is in background and stream is unregistering, it might be due to
// network interruption - save state for recovery instead of just dropping it
if (_isInBackground &&
!saveForRecovery &&
_streamMetadata.containsKey(streamId)) {
DebugLogger.stream(
'PersistentStreamingService: Stream $streamId interrupted in background, saving for recovery',
);
// Don't unregister yet - keep it for recovery
_markStreamAsSuspended(streamId);
return;
}
_activeStreams.remove(streamId);
_streamControllers.remove(streamId);
_streamRecoveryCallbacks.remove(streamId);
_streamMetadata.remove(streamId);
_retryAttempts.remove(streamId);
// Unregister from background handler
_backgroundHandler.unregisterStream(streamId);
// Stop background execution if no more streams
if (_activeStreams.isEmpty) {
_backgroundHandler.stopBackgroundExecution([streamId]);
_disableWakeLock();
}
DebugLogger.stream(
'PersistentStreamingService: Unregistered stream $streamId',
);
}
// Check if a stream is still active
bool isStreamActive(String streamId) {
return _activeStreams.containsKey(streamId);
}
// Recover interrupted streams
Future<void> _recoverActiveStreams() async {
if (!_hasConnectivity) {
DebugLogger.stream(
'PersistentStreaming: No connectivity, skipping recovery',
);
return;
}
// First, try to recover from background handler saved states
final savedStates = await _backgroundHandler.recoverStreamStates();
for (final state in savedStates) {
if (!state.isStale()) {
await _recoverStreamFromState(state);
}
}
// Then check active streams for recovery
for (final entry in _streamRecoveryCallbacks.entries) {
final streamId = entry.key;
final recoveryCallback = entry.value;
// Check if stream was interrupted or needs recovery
final subscription = _activeStreams[streamId];
if (subscription == null || _needsRecovery(streamId)) {
await _attemptStreamRecovery(streamId, recoveryCallback);
}
}
}
Future<void> _recoverStreamFromState(StreamState state) async {
final recoveryCallback = _streamRecoveryCallbacks[state.streamId];
if (recoveryCallback != null) {
DebugLogger.stream(
'PersistentStreaming: Recovering stream from saved state: ${state.streamId}',
);
await _attemptStreamRecovery(state.streamId, recoveryCallback);
}
}
Future<void> _attemptStreamRecovery(
String streamId,
Function recoveryCallback,
) async {
final attempts = _retryAttempts[streamId] ?? 0;
if (attempts >= _maxRetryAttempts) {
DebugLogger.warning(
'PersistentStreaming: Max retry attempts reached for stream $streamId',
);
return;
}
DebugLogger.stream(
'PersistentStreaming: Recovering stream $streamId (attempt ${attempts + 1})',
);
try {
_retryAttempts[streamId] = attempts + 1;
// Add exponential backoff delay
if (attempts > 0) {
final delay = _retryDelay * (1 << (attempts - 1)); // 2s, 4s, 8s...
await Future.delayed(delay);
}
// Call recovery callback to restart the stream
await recoveryCallback();
// Reset retry count on success
_retryAttempts.remove(streamId);
} catch (e) {
DebugLogger.error(
'recover-failed',
scope: 'streaming/persistent',
error: e,
data: {'streamId': streamId},
);
// Schedule next retry if under limit
if (_retryAttempts[streamId]! < _maxRetryAttempts) {
Timer(
_retryDelay,
() => _attemptStreamRecovery(streamId, recoveryCallback),
);
}
}
}
bool _needsRecovery(String streamId) {
final metadata = _streamMetadata[streamId];
if (metadata == null) return false;
// Check if stream is marked as suspended
if (metadata['suspended'] == true) {
final suspendedAt = metadata['suspendedAt'] as DateTime?;
if (suspendedAt != null) {
final timeSinceSuspend = DateTime.now().difference(suspendedAt);
// Try to recover suspended streams after 10 seconds
return timeSinceSuspend > const Duration(seconds: 10);
}
}
// Check if stream has been inactive for too long
final lastUpdate = metadata['lastUpdate'] as DateTime?;
if (lastUpdate != null) {
final timeSinceUpdate = DateTime.now().difference(lastUpdate);
// In background: 90 seconds
// In foreground: 2 minutes
final threshold = _isInBackground
? const Duration(seconds: 90)
: const Duration(minutes: 2);
return timeSinceUpdate > threshold;
}
return false;
}
// Platform-specific background execution
void _startBackgroundExecution() {
if (_activeStreams.isNotEmpty) {
_backgroundHandler.startBackgroundExecution(_activeStreams.keys.toList());
}
}
void _markStreamAsSuspended(String streamId) {
final metadata = _streamMetadata[streamId];
if (metadata != null) {
metadata['suspended'] = true;
metadata['suspendedAt'] = DateTime.now();
}
}
void _suspendAllStreams() {
for (final streamId in _activeStreams.keys) {
_markStreamAsSuspended(streamId);
}
}
void _saveStreamStatesForRecovery() {
if (_activeStreams.isEmpty) {
DebugLogger.stream(
'PersistentStreaming: No active streams to save for recovery',
);
return;
}
DebugLogger.stream(
'PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery',
);
// Actually save the stream states through the background handler
final streamIds = _activeStreams.keys.toList();
_backgroundHandler.saveStreamStatesForRecovery(streamIds, 'app_detached');
}
// Update stream metadata when chunks are received
void updateStreamProgress(
String streamId, {
int? chunkSequence,
String? content,
String? appendedContent,
}) {
// Update background handler state
_backgroundHandler.updateStreamState(
streamId,
chunkSequence: chunkSequence,
content: content,
appendedContent: appendedContent,
);
// Update local metadata
final metadata = _streamMetadata[streamId];
if (metadata != null) {
metadata['lastUpdate'] = DateTime.now();
metadata['lastChunkSequence'] =
chunkSequence ?? metadata['lastChunkSequence'];
if (appendedContent != null) {
metadata['lastContent'] =
(metadata['lastContent'] ?? '') + appendedContent;
} else if (content != null) {
metadata['lastContent'] = content;
}
metadata['suspended'] = false; // Mark as active
}
}
// Wake lock management
void _enableWakeLock() async {
try {
await WakelockPlus.enable();
DebugLogger.stream('wake-lock-enabled', scope: 'streaming/persistent');
} catch (e) {
DebugLogger.error(
'wake-lock-enable-failed',
scope: 'streaming/persistent',
error: e,
);
}
}
void _disableWakeLock() async {
try {
await WakelockPlus.disable();
DebugLogger.stream('wake-lock-disabled', scope: 'streaming/persistent');
} catch (e) {
DebugLogger.error(
'wake-lock-disable-failed',
scope: 'streaming/persistent',
error: e,
);
}
}
// Get active stream count
int get activeStreamCount => _activeStreams.length;
// Check if app is in background
bool get isInBackground => _isInBackground;
// Get stream metadata
Map<String, dynamic>? getStreamMetadata(String streamId) {
return _streamMetadata[streamId];
}
// Check if stream is suspended
bool isStreamSuspended(String streamId) {
final metadata = _streamMetadata[streamId];
return metadata?['suspended'] == true;
}
// Force recovery of a specific stream
Future<void> forceRecoverStream(String streamId) async {
final recoveryCallback = _streamRecoveryCallbacks[streamId];
if (recoveryCallback != null) {
_retryAttempts.remove(streamId); // Reset retry count
await _attemptStreamRecovery(streamId, recoveryCallback);
}
}
// Cleanup
void dispose() {
WidgetsBinding.instance.removeObserver(this);
_backgroundTimer?.cancel();
_heartbeatTimer?.cancel();
_connectivitySubscription?.cancel();
_disableWakeLock();
// Stop all background execution
if (_activeStreams.isNotEmpty) {
_backgroundHandler.stopBackgroundExecution(_activeStreams.keys.toList());
}
// Cancel all active streams
for (final subscription in _activeStreams.values) {
subscription.cancel();
}
_activeStreams.clear();
// Close all controllers
for (final controller in _streamControllers.values) {
if (!controller.isClosed) {
controller.close();
}
}
_streamControllers.clear();
// Clear all metadata
_streamMetadata.clear();
_streamRecoveryCallbacks.clear();
_retryAttempts.clear();
// Clear background handler
_backgroundHandler.clearAll();
}
}

View File

@@ -1,3 +1,5 @@
import 'dart:async';
import 'package:flutter/widgets.dart';
import 'package:socket_io_client/socket_io_client.dart' as io;

View File

@@ -1,202 +0,0 @@
import 'dart:async';
import 'dart:convert';
import 'package:dio/dio.dart';
import '../utils/debug_logger.dart';
/// Parser for Server-Sent Events (SSE) streaming responses.
///
/// This matches the web client's EventSourceParserStream behavior,
/// parsing SSE data chunks and extracting OpenAI-compatible deltas.
class SSEStreamParser {
/// Parse an SSE response stream from Dio into text chunks.
///
/// Returns a stream of content strings extracted from OpenAI-style
/// completion chunks.
///
/// [heartbeatTimeout] - Maximum time without data before considering
/// the connection stale (default: 2 minutes)
/// [onHeartbeat] - Callback invoked when any data is received
static Stream<String> parseResponseStream(
ResponseBody responseBody, {
bool splitLargeDeltas = false,
Duration heartbeatTimeout = const Duration(minutes: 2),
void Function()? onHeartbeat,
}) async* {
DateTime lastDataReceived = DateTime.now();
Timer? heartbeatTimer;
// Set up heartbeat monitoring
if (heartbeatTimeout.inMilliseconds > 0) {
heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
final timeSinceLastData = DateTime.now().difference(lastDataReceived);
if (timeSinceLastData > heartbeatTimeout) {
DebugLogger.warning(
'SSE stream heartbeat timeout: No data received for ${timeSinceLastData.inSeconds}s',
data: {'timeout': heartbeatTimeout.inSeconds},
);
timer.cancel();
}
});
}
try {
// Buffer for accumulating incomplete SSE messages
String buffer = '';
await for (final chunk in responseBody.stream) {
// Update last data timestamp and invoke heartbeat callback
lastDataReceived = DateTime.now();
onHeartbeat?.call();
// Convert bytes to string (Dio ResponseBody.stream always emits Uint8List)
final text = utf8.decode(chunk as List<int>, allowMalformed: true);
buffer += text;
// Process complete SSE messages (delimited by double newline)
final messages = buffer.split('\n\n');
// Keep the last (potentially incomplete) message in the buffer
buffer = messages.removeLast();
for (final message in messages) {
if (message.trim().isEmpty) continue;
// Parse SSE message
final content = _parseSSEMessage(message);
if (content != null) {
if (content == '[DONE]') {
// Stream completion signal
DebugLogger.stream('SSE stream completed with [DONE] signal');
return;
}
// Split large deltas into smaller chunks for smoother UI updates
if (splitLargeDeltas && content.length > 5) {
yield* _splitIntoChunks(content);
} else {
yield content;
}
}
}
}
// Process any remaining buffered data
if (buffer.trim().isNotEmpty) {
final content = _parseSSEMessage(buffer);
if (content != null && content != '[DONE]') {
yield content;
}
}
} catch (e, stackTrace) {
DebugLogger.error(
'sse-parse-error',
scope: 'streaming/sse',
error: e,
stackTrace: stackTrace,
);
rethrow;
} finally {
// Clean up heartbeat timer
heartbeatTimer?.cancel();
}
}
/// Parse a single SSE message and extract content.
static String? _parseSSEMessage(String message) {
try {
// SSE format: "data: <json>\n" or just the JSON
String dataLine = message.trim();
// Remove "data: " prefix if present
if (dataLine.startsWith('data: ')) {
dataLine = dataLine.substring(6).trim();
} else if (dataLine.startsWith('data:')) {
dataLine = dataLine.substring(5).trim();
}
// Handle [DONE] signal
if (dataLine == '[DONE]' || dataLine == 'DONE') {
return '[DONE]';
}
// Skip empty data
if (dataLine.isEmpty) {
return null;
}
// Parse JSON
try {
final json = jsonDecode(dataLine) as Map<String, dynamic>;
// Handle errors
if (json['error'] != null) {
DebugLogger.error(
'sse-error-response',
scope: 'streaming/sse',
error: json['error'],
);
return null;
}
// Extract content from OpenAI-style response
// Format: { choices: [{ delta: { content: "..." } }] }
final choices = json['choices'];
if (choices is List && choices.isNotEmpty) {
final choice = choices.first as Map<String, dynamic>?;
if (choice != null) {
final delta = choice['delta'] as Map<String, dynamic>?;
if (delta != null) {
final content = delta['content'];
if (content is String && content.isNotEmpty) {
return content;
}
}
}
}
// Alternative format: { content: "..." }
final directContent = json['content'];
if (directContent is String && directContent.isNotEmpty) {
return directContent;
}
return null;
} on FormatException catch (e) {
DebugLogger.warning(
'Failed to parse SSE JSON: $dataLine',
data: {'error': e.toString()},
);
return null;
}
} catch (e) {
DebugLogger.error(
'sse-message-parse-error',
scope: 'streaming/sse',
error: e,
data: {'message': message},
);
return null;
}
}
/// Split large content into smaller chunks for smoother streaming.
/// This matches the web client's streamLargeDeltasAsRandomChunks behavior.
static Stream<String> _splitIntoChunks(String content) async* {
var remaining = content;
while (remaining.isNotEmpty) {
// Random chunk size between 1-3 characters
final chunkSize = (remaining.length < 3)
? remaining.length
: 1 + (DateTime.now().millisecond % 3);
final chunk = remaining.substring(0, chunkSize);
yield chunk;
// Small delay for smoother visual effect (matching web client)
await Future.delayed(const Duration(milliseconds: 5));
remaining = remaining.substring(chunkSize);
}
}
}

View File

@@ -5,7 +5,6 @@ import 'package:flutter/material.dart';
import '../../core/models/chat_message.dart';
import '../../core/models/socket_event.dart';
import '../../core/services/persistent_streaming_service.dart';
import '../../core/services/socket_service.dart';
import '../../core/utils/inactivity_watchdog.dart';
import '../../core/utils/tool_calls_parser.dart';
@@ -163,86 +162,44 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
required void Function() finishStreaming,
required List<ChatMessage> Function() getMessages,
}) {
// Persistable controller to survive brief app suspensions
// Track if streaming has been finished to avoid duplicate cleanup
bool hasFinished = false;
// Wrap finishStreaming to always clear the cancel token
void wrappedFinishStreaming() {
if (hasFinished) return;
hasFinished = true;
api.clearStreamCancelToken(assistantMessageId);
finishStreaming();
}
// Controller for forwarding data to StreamingResponseController
// With WebSocket-only streaming, the HTTP stream closes immediately after returning task_id.
// All actual content comes via WebSocket events, so we don't need persistent stream tracking.
final persistentController = StreamController<String>.broadcast();
final persistentService = PersistentStreamingService();
// Track if stream has received any data
bool hasReceivedData = false;
// Create subscription first so we can reference it in onDone
late final String streamId;
final subscription = stream.listen(
// Subscribe to HTTP stream (mainly for error handling - content comes via WebSocket)
final httpSubscription = stream.listen(
(data) {
hasReceivedData = true;
// Forward any HTTP stream data (rare with WebSocket-only)
persistentController.add(data);
},
onDone: () async {
onDone: () {
DebugLogger.stream(
'Source stream onDone fired, hasReceivedData=$hasReceivedData',
'HTTP stream completed - WebSocket handles content delivery',
);
// If stream closes immediately without data, it's likely due to backgrounding/network drop
// Not a natural completion
if (!hasReceivedData) {
DebugLogger.stream(
'Stream closed without data - likely interrupted, not completing',
);
// Check if app is backgrounding - if so, finish streaming with whatever we have
await Future.delayed(const Duration(milliseconds: 300));
if (persistentService.isInBackground) {
DebugLogger.stream(
'App backgrounding during stream - finishing with current content',
);
finishStreaming();
}
// Don't close the controller to prevent cascading completion handlers
return;
}
// For streams with data, delay to allow background detection
await Future.delayed(const Duration(milliseconds: 500));
final isInBg = persistentService.isInBackground;
DebugLogger.stream(
'Stream onDone check: streamId=$streamId, isInBackground=$isInBg',
);
// Check if we're in background before closing
if (!isInBg) {
DebugLogger.stream(
'Closing stream controller for $streamId (foreground completion)',
);
// Close the controller to trigger StreamingResponseController.onComplete
// WebSocket events continue independently via socket subscriptions
if (!persistentController.isClosed) {
persistentController.close();
} else {
DebugLogger.stream(
'Source stream completed in background for $streamId - keeping open for recovery',
);
// Finish streaming to save the content we have
finishStreaming();
}
},
onError: persistentController.addError,
);
streamId = persistentService.registerStream(
subscription: subscription,
controller: persistentController,
recoveryCallback: () async {
DebugLogger.log(
'Attempting to recover interrupted stream',
scope: 'streaming/helper',
);
},
metadata: {
'conversationId': activeConversationId,
'messageId': assistantMessageId,
'modelId': modelId,
},
);
api.registerPersistentStreamForMessage(assistantMessageId, streamId);
InactivityWatchdog? socketWatchdog;
// Socket subscriptions list - starts empty so non-socket flows can finish via onComplete.
// HTTP subscription is tracked separately and cleaned up in disposeSocketSubscriptions.
final socketSubscriptions = <VoidCallback>[];
final hasSocketSignals =
socketService != null || registerDeltaListener != null;
@@ -268,7 +225,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
finishStreaming();
wrappedFinishStreaming();
}
} catch (_) {}
socketWatchdog?.stop();
@@ -284,15 +241,19 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
int imageCollectionRequestId = 0;
void disposeSocketSubscriptions() {
if (socketSubscriptions.isEmpty) {
return;
}
// Cancel HTTP subscription
try {
httpSubscription.cancel();
} catch (_) {}
// Cancel socket subscriptions
for (final dispose in socketSubscriptions) {
try {
dispose();
} catch (_) {}
}
socketSubscriptions.clear();
imageCollectionDebounce?.cancel();
imageCollectionDebounce = null;
pendingImageContent = null;
@@ -481,7 +442,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
sessionId: sessionId,
);
} catch (_) {}
finishStreaming();
wrappedFinishStreaming();
socketWatchdog?.stop();
return;
}
@@ -502,7 +463,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
sessionId: sessionId,
);
} catch (_) {}
finishStreaming();
wrappedFinishStreaming();
socketWatchdog?.stop();
return;
}
@@ -563,7 +524,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
try {
socketService?.offEvent(channel);
} catch (_) {}
finishStreaming();
wrappedFinishStreaming();
socketWatchdog?.stop();
return;
}
@@ -781,13 +742,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
} catch (_) {
} finally {
finishStreaming();
wrappedFinishStreaming();
}
});
return;
}
}
finishStreaming();
wrappedFinishStreaming();
socketWatchdog?.stop();
}
}
@@ -816,7 +777,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
});
}
disposeSocketSubscriptions();
finishStreaming();
wrappedFinishStreaming();
} else if (type == 'chat:message:follow_ups' && payload != null) {
DebugLogger.log('Received follow-ups event', scope: 'streaming/helper');
final followMap = _asStringMap(payload);
@@ -966,7 +927,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
return message.copyWith(statusHistory: filtered);
});
// Ensure UI exits streaming state
finishStreaming();
wrappedFinishStreaming();
socketWatchdog?.stop();
} else if ((type == 'chat:message:delta' || type == 'message') &&
payload != null) {
@@ -1246,17 +1207,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
},
onComplete: () {
api.clearPersistentStreamForMessage(
assistantMessageId,
expectedStreamId: streamId,
);
// Unregister from persistent service
persistentService.unregisterStream(streamId);
// Stream completion without socket subscriptions indicates a simple flow
// For WebSocket flows, completion should be handled by socket events (done: true)
// HTTP stream completed - cleanup already done in onDone handler.
// For WebSocket flows, actual completion is handled by socket events (done: true).
// Only finish streaming here if there are no socket subscriptions (simple/legacy flow).
if (socketSubscriptions.isEmpty) {
finishStreaming();
wrappedFinishStreaming();
Future.microtask(refreshConversationSnapshot);
}
},
@@ -1272,14 +1227,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
},
);
api.clearPersistentStreamForMessage(
assistantMessageId,
expectedStreamId: streamId,
);
try {
persistentService.unregisterStream(streamId);
} catch (_) {}
// Check if this is a recoverable error (network issues, etc.)
final errorText = error.toString();
final isRecoverable =
@@ -1303,7 +1250,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
disposeSocketSubscriptions();
finishStreaming();
wrappedFinishStreaming();
Future.microtask(refreshConversationSnapshot);
socketWatchdog?.stop();
},

View File

@@ -17,7 +17,7 @@ typedef StreamingErrorCallback =
///
/// This wraps a [StreamSubscription], normalises error handling, and exposes
/// a unified cancel method so UI layers can stop streaming without having to
/// know the underlying transport (SSE, polling, etc.).
/// know the underlying transport (WebSocket, polling, etc.).
class StreamingResponseController {
StreamingResponseController({
required Stream<String> stream,