Merge pull request #184 from cogwheel0/streaming-logic-simplification
feat(streaming): Simplify streaming logic and remove persistent tracking
This commit is contained in:
@@ -17,7 +17,7 @@ if (keystorePropertiesFile.exists()) {
|
||||
android {
|
||||
namespace = "app.cogwheel.conduit"
|
||||
compileSdk = 36
|
||||
ndkVersion = "27.0.12077973"
|
||||
ndkVersion = "29.0.14206865"
|
||||
|
||||
defaultConfig {
|
||||
applicationId = "app.cogwheel.conduit"
|
||||
|
||||
@@ -13,7 +13,6 @@ import '../services/app_intents_service.dart';
|
||||
import '../services/quick_actions_service.dart';
|
||||
import '../models/conversation.dart';
|
||||
import '../services/background_streaming_handler.dart';
|
||||
import '../services/persistent_streaming_service.dart';
|
||||
import '../services/socket_service.dart';
|
||||
import '../../features/onboarding/views/onboarding_sheet.dart';
|
||||
import '../../features/chat/providers/chat_providers.dart';
|
||||
@@ -198,15 +197,6 @@ class AppStartupFlow extends _$AppStartupFlow {
|
||||
keepAlive(socketPersistenceProvider);
|
||||
});
|
||||
|
||||
// Ensure persistent streaming uses the shared connectivity service
|
||||
final connectivityService = ref.read(connectivityServiceProvider);
|
||||
Future<void>.delayed(const Duration(milliseconds: 160), () {
|
||||
if (!ref.mounted) return;
|
||||
PersistentStreamingService().attachConnectivityService(
|
||||
connectivityService,
|
||||
);
|
||||
});
|
||||
|
||||
// Warm the conversations list in the background as soon as possible,
|
||||
// but avoid doing so on poor connectivity to reduce startup load.
|
||||
// Apply a small randomized delay to smooth load spikes across app wakes.
|
||||
@@ -432,7 +422,7 @@ class _ForegroundRefreshObserver extends WidgetsBindingObserver {
|
||||
}
|
||||
|
||||
/// Attempts to keep the realtime socket connection alive while the app is
|
||||
/// backgrounded, similar to how PersistentStreamingService works for streams.
|
||||
/// backgrounded using BackgroundStreamingHandler for platform-specific handling.
|
||||
///
|
||||
/// Notes:
|
||||
/// - iOS: limited to short background task windows; we send periodic keepAlive.
|
||||
|
||||
@@ -7,9 +7,9 @@ import '../auth/auth_state_manager.dart';
|
||||
import '../providers/app_providers.dart';
|
||||
import '../services/connectivity_service.dart';
|
||||
import '../services/navigation_service.dart';
|
||||
import '../services/persistent_streaming_service.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
import '../../features/auth/providers/unified_auth_providers.dart';
|
||||
import '../../features/chat/providers/chat_providers.dart';
|
||||
import '../../features/auth/views/authentication_page.dart';
|
||||
import '../../features/auth/views/connect_signin_page.dart';
|
||||
import '../../features/auth/views/connection_issue_page.dart';
|
||||
@@ -37,6 +37,7 @@ class RouterNotifier extends ChangeNotifier {
|
||||
connectivityStatusProvider,
|
||||
_onStateChanged,
|
||||
),
|
||||
ref.listen<bool>(isChatStreamingProvider, _onStateChanged),
|
||||
];
|
||||
}
|
||||
|
||||
@@ -112,8 +113,8 @@ class RouterNotifier extends ChangeNotifier {
|
||||
// 2. Connectivity is explicitly offline
|
||||
// 3. Auth is authenticated (don't interrupt auth flow)
|
||||
// 4. App is in foreground and offline warning isn't suppressed
|
||||
// 5. No active streaming is in progress (avoid interrupting token streams)
|
||||
final hasActiveStreams = PersistentStreamingService().activeStreamCount > 0;
|
||||
// 5. No active streaming is in progress (avoid interrupting chat streams)
|
||||
final hasActiveStreams = ref.read(isChatStreamingProvider);
|
||||
final shouldShowConnectionIssue =
|
||||
!reviewerMode &&
|
||||
connectivity == ConnectivityStatus.offline &&
|
||||
|
||||
@@ -19,9 +19,7 @@ import '../models/prompt.dart';
|
||||
import '../auth/api_auth_interceptor.dart';
|
||||
import '../error/api_error_interceptor.dart';
|
||||
// Tool-call details are parsed in the UI layer to render collapsible blocks
|
||||
import 'persistent_streaming_service.dart';
|
||||
import 'connectivity_service.dart';
|
||||
import 'sse_stream_parser.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
import 'conversation_parsing.dart';
|
||||
import 'worker_manager.dart';
|
||||
@@ -2596,36 +2594,12 @@ class ApiService {
|
||||
// Chat streaming with conversation context
|
||||
// Track cancellable streaming requests by messageId for stop parity
|
||||
final Map<String, CancelToken> _streamCancelTokens = {};
|
||||
final Map<String, String> _messagePersistentStreamIds = {};
|
||||
|
||||
/// Associates a streaming message with its persistent stream identifier.
|
||||
void registerPersistentStreamForMessage(String messageId, String streamId) {
|
||||
_messagePersistentStreamIds[messageId] = streamId;
|
||||
}
|
||||
|
||||
/// Removes the persistent stream mapping for a message if it matches.
|
||||
///
|
||||
/// Returns the removed persistent stream identifier when one existed and
|
||||
/// matched the optional [expectedStreamId].
|
||||
String? clearPersistentStreamForMessage(
|
||||
String messageId, {
|
||||
String? expectedStreamId,
|
||||
}) {
|
||||
final current = _messagePersistentStreamIds[messageId];
|
||||
if (current == null) {
|
||||
return null;
|
||||
}
|
||||
if (expectedStreamId != null && current != expectedStreamId) {
|
||||
return null;
|
||||
}
|
||||
return _messagePersistentStreamIds.remove(messageId);
|
||||
}
|
||||
|
||||
// Send message using dual-stream approach (HTTP SSE + WebSocket events).
|
||||
// Matches OpenWebUI web client behavior:
|
||||
// - HTTP SSE stream provides immediate content chunks
|
||||
// - WebSocket events deliver metadata, tool status, sources, follow-ups
|
||||
// - Both streams run in parallel for reliability
|
||||
// Send message using WebSocket-only streaming.
|
||||
// Matches OpenWebUI web client behavior when session_id + chat_id + message_id are provided:
|
||||
// - HTTP POST returns JSON with task_id (no SSE streaming)
|
||||
// - All content and metadata delivered via WebSocket events
|
||||
// - Events: chat:completion, chat:message:delta, status, source, follow_ups, etc.
|
||||
// Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow)
|
||||
({
|
||||
Stream<String> stream,
|
||||
@@ -2790,7 +2764,7 @@ class ApiService {
|
||||
_traceApi('Including non-image files in request: ${allFiles.length}');
|
||||
}
|
||||
|
||||
_traceApi('Preparing dual-stream chat request (HTTP SSE + WebSocket)');
|
||||
_traceApi('Preparing WebSocket-only chat request');
|
||||
_traceApi('Model: $model');
|
||||
_traceApi('Message count: ${processedMessages.length}');
|
||||
|
||||
@@ -2830,118 +2804,85 @@ class ApiService {
|
||||
);
|
||||
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
|
||||
|
||||
_traceApi('Initiating dual-stream request (HTTP SSE + WebSocket)');
|
||||
_traceApi('Initiating WebSocket-only chat request');
|
||||
_traceApi('Posting to /api/chat/completions');
|
||||
|
||||
// Create a cancel token for this request
|
||||
final cancelToken = CancelToken();
|
||||
_streamCancelTokens[messageId] = cancelToken;
|
||||
|
||||
// Start HTTP SSE stream (matches web client behavior)
|
||||
// The WebSocket events will run in parallel via streaming_helper.dart
|
||||
// Send HTTP request to initiate chat task
|
||||
// With session_id + chat_id + message_id, the server returns a task_id
|
||||
// and all streaming happens via WebSocket events (not SSE)
|
||||
() async {
|
||||
try {
|
||||
final resp = await _dio.post(
|
||||
'/api/chat/completions',
|
||||
data: data,
|
||||
options: Options(
|
||||
responseType: ResponseType.stream,
|
||||
// Extended timeout for streaming responses - allow up to 10 minutes
|
||||
// for long-running tool calls and reasoning
|
||||
receiveTimeout: const Duration(minutes: 10),
|
||||
// Shorter send timeout for the initial request
|
||||
responseType: ResponseType.json,
|
||||
receiveTimeout: const Duration(seconds: 30),
|
||||
sendTimeout: const Duration(seconds: 30),
|
||||
headers: {
|
||||
'Accept': 'text/event-stream',
|
||||
// Enable HTTP keep-alive to maintain connection in background
|
||||
'Connection': 'keep-alive',
|
||||
// Request server to send keep-alive messages
|
||||
'Cache-Control': 'no-cache',
|
||||
},
|
||||
),
|
||||
cancelToken: cancelToken,
|
||||
);
|
||||
|
||||
final respData = resp.data;
|
||||
|
||||
// Check if we got a task_id response (non-streaming)
|
||||
if (respData is Map && respData['task_id'] != null) {
|
||||
if (respData is Map) {
|
||||
if (respData['task_id'] != null) {
|
||||
final taskId = respData['task_id'].toString();
|
||||
_traceApi('Background task created: $taskId');
|
||||
|
||||
// In this case, all streaming will happen via WebSocket
|
||||
// Close HTTP stream but keep WebSocket active
|
||||
} else if (respData['status'] == true) {
|
||||
_traceApi('Chat task initiated successfully');
|
||||
} else if (respData['error'] != null) {
|
||||
_traceApi('Server error: ${respData['error']}');
|
||||
if (!streamController.isClosed) {
|
||||
streamController.close();
|
||||
streamController.addError(Exception(respData['error'].toString()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// We have a streaming response body
|
||||
if (respData is ResponseBody) {
|
||||
_traceApi('HTTP SSE stream started for message: $messageId');
|
||||
|
||||
// Parse SSE stream and forward chunks to controller
|
||||
await for (final chunk in SSEStreamParser.parseResponseStream(
|
||||
respData,
|
||||
splitLargeDeltas: false,
|
||||
heartbeatTimeout: const Duration(minutes: 2),
|
||||
onHeartbeat: () {
|
||||
// Notify persistent streaming service that connection is alive
|
||||
final persistentStreamId = _messagePersistentStreamIds[messageId];
|
||||
if (persistentStreamId != null) {
|
||||
PersistentStreamingService().updateStreamProgress(
|
||||
persistentStreamId,
|
||||
chunkSequence: DateTime.now().millisecondsSinceEpoch,
|
||||
);
|
||||
}
|
||||
},
|
||||
)) {
|
||||
if (!streamController.isClosed) {
|
||||
streamController.add(chunk);
|
||||
} else {
|
||||
_traceApi('Stream controller closed, stopping SSE parsing');
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_traceApi('HTTP SSE stream completed for message: $messageId');
|
||||
} else {
|
||||
_traceApi('Unexpected response type: ${respData.runtimeType}');
|
||||
}
|
||||
|
||||
// Close the HTTP stream controller
|
||||
// WebSocket events will continue independently via streaming_helper
|
||||
// Close HTTP stream controller - WebSocket handles all content delivery
|
||||
if (!streamController.isClosed) {
|
||||
streamController.close();
|
||||
}
|
||||
} on DioException catch (e) {
|
||||
if (CancelToken.isCancel(e)) {
|
||||
_traceApi('HTTP stream cancelled for message: $messageId');
|
||||
_traceApi('Request cancelled for message: $messageId');
|
||||
} else {
|
||||
_traceApi('HTTP stream error: $e');
|
||||
_traceApi('Request error: $e');
|
||||
if (!streamController.isClosed) {
|
||||
streamController.addError(e);
|
||||
streamController.close();
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
_traceApi('Unexpected error in HTTP stream: $e');
|
||||
_traceApi('Unexpected error: $e');
|
||||
if (!streamController.isClosed) {
|
||||
streamController.addError(e);
|
||||
streamController.close();
|
||||
}
|
||||
} finally {
|
||||
_streamCancelTokens.remove(messageId);
|
||||
}
|
||||
// Note: Don't remove cancel token here - it should remain until WebSocket
|
||||
// streaming finishes so Stop button can cancel the active generation.
|
||||
// Token is removed by clearStreamCancelToken() when streaming completes.
|
||||
}();
|
||||
|
||||
// Determine if this is actually a background flow based on the request payload
|
||||
final bool isBackgroundFlow =
|
||||
hasBackgroundTasksPayload ||
|
||||
(toolIds != null && toolIds.isNotEmpty) ||
|
||||
(toolServers != null && toolServers.isNotEmpty) ||
|
||||
enableWebSearch ||
|
||||
enableImageGeneration;
|
||||
|
||||
return (
|
||||
stream: streamController.stream,
|
||||
messageId: messageId,
|
||||
sessionId: sessionId,
|
||||
socketSessionId: socketSessionId,
|
||||
isBackgroundFlow: true,
|
||||
isBackgroundFlow: isBackgroundFlow,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2975,13 +2916,12 @@ class ApiService {
|
||||
token.cancel('User cancelled');
|
||||
}
|
||||
} catch (_) {}
|
||||
|
||||
try {
|
||||
final pid = clearPersistentStreamForMessage(messageId);
|
||||
if (pid != null) {
|
||||
PersistentStreamingService().unregisterStream(pid);
|
||||
}
|
||||
} catch (_) {}
|
||||
|
||||
/// Clears the cancel token for a message when streaming completes normally.
|
||||
/// Called by streaming_helper when finishStreaming is invoked.
|
||||
void clearStreamCancelToken(String messageId) {
|
||||
_streamCancelTokens.remove(messageId);
|
||||
}
|
||||
|
||||
// File upload for RAG
|
||||
|
||||
@@ -1,627 +0,0 @@
|
||||
import 'dart:async';
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:wakelock_plus/wakelock_plus.dart';
|
||||
import 'background_streaming_handler.dart';
|
||||
import 'connectivity_service.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
|
||||
class PersistentStreamingService with WidgetsBindingObserver {
|
||||
static final PersistentStreamingService _instance =
|
||||
PersistentStreamingService._internal();
|
||||
factory PersistentStreamingService() => _instance;
|
||||
PersistentStreamingService._internal() {
|
||||
_initialize();
|
||||
}
|
||||
|
||||
// Active streams registry
|
||||
final Map<String, StreamSubscription> _activeStreams = {};
|
||||
final Map<String, StreamController> _streamControllers = {};
|
||||
final Map<String, Function> _streamRecoveryCallbacks = {};
|
||||
final Map<String, Map<String, dynamic>> _streamMetadata = {};
|
||||
|
||||
// App lifecycle state
|
||||
// AppLifecycleState? _lastLifecycleState; // Removed as it's unused
|
||||
bool _isInBackground = false;
|
||||
Timer? _backgroundTimer;
|
||||
Timer? _heartbeatTimer;
|
||||
|
||||
// Background streaming handler
|
||||
late final BackgroundStreamingHandler _backgroundHandler;
|
||||
|
||||
// Connectivity monitoring
|
||||
StreamSubscription<bool>? _connectivitySubscription;
|
||||
ConnectivityService? _connectivityService;
|
||||
bool _hasConnectivity = true;
|
||||
|
||||
// Recovery state
|
||||
final Map<String, int> _retryAttempts = {};
|
||||
static const int _maxRetryAttempts = 3;
|
||||
static const Duration _retryDelay = Duration(seconds: 2);
|
||||
|
||||
void _initialize() {
|
||||
WidgetsBinding.instance.addObserver(this);
|
||||
_backgroundHandler = BackgroundStreamingHandler.instance;
|
||||
_setupBackgroundHandlerCallbacks();
|
||||
_startHeartbeat();
|
||||
}
|
||||
|
||||
void _setupBackgroundHandlerCallbacks() {
|
||||
_backgroundHandler.onServiceFailed = (error, errorType, streamIds) {
|
||||
DebugLogger.error(
|
||||
'background-service-failed',
|
||||
scope: 'streaming/persistent',
|
||||
error: '$errorType: $error',
|
||||
data: {'affectedStreams': streamIds},
|
||||
);
|
||||
|
||||
// Attempt immediate recovery for failed streams
|
||||
for (final streamId in streamIds) {
|
||||
final callback = _streamRecoveryCallbacks[streamId];
|
||||
if (callback != null) {
|
||||
// Schedule recovery after a short delay
|
||||
Future.delayed(const Duration(seconds: 2), () {
|
||||
if (_activeStreams.containsKey(streamId)) {
|
||||
_attemptStreamRecovery(streamId, callback);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
_backgroundHandler.onStreamsSuspending = (streamIds) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Streams suspending - $streamIds',
|
||||
);
|
||||
// Mark streams as suspended but don't close them yet
|
||||
for (final streamId in streamIds) {
|
||||
_markStreamAsSuspended(streamId);
|
||||
}
|
||||
};
|
||||
|
||||
_backgroundHandler.onBackgroundTaskExpiring = () {
|
||||
DebugLogger.stream('PersistentStreaming: Background task expiring');
|
||||
// Save states and prepare for recovery
|
||||
_saveStreamStatesForRecovery();
|
||||
};
|
||||
|
||||
_backgroundHandler
|
||||
.onBackgroundTaskExtended = (streamIds, estimatedSeconds) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Background task extended for $estimatedSeconds seconds',
|
||||
);
|
||||
// BGTaskScheduler has given us more time - streams can continue
|
||||
for (final streamId in streamIds) {
|
||||
final metadata = _streamMetadata[streamId];
|
||||
if (metadata != null) {
|
||||
metadata['bgTaskExtended'] = true;
|
||||
metadata['bgTaskExtendedAt'] = DateTime.now();
|
||||
metadata['bgTaskEstimatedTime'] = estimatedSeconds;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
_backgroundHandler.onBackgroundKeepAlive = () {
|
||||
DebugLogger.stream('PersistentStreaming: Background keep-alive signal');
|
||||
// BGTaskScheduler is keeping us alive - we can continue streaming
|
||||
_heartbeatTimer?.cancel();
|
||||
_startHeartbeat(); // Restart heartbeat timer
|
||||
};
|
||||
|
||||
_backgroundHandler.shouldContinueInBackground = () {
|
||||
return _activeStreams.isNotEmpty;
|
||||
};
|
||||
}
|
||||
|
||||
void attachConnectivityService(ConnectivityService service) {
|
||||
if (identical(_connectivityService, service)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_connectivitySubscription?.cancel();
|
||||
_connectivityService = service;
|
||||
_connectivitySubscription = service.statusStream
|
||||
.map((status) => status == ConnectivityStatus.online)
|
||||
.listen(_handleConnectivityChange);
|
||||
}
|
||||
|
||||
void _handleConnectivityChange(bool connected) {
|
||||
final wasConnected = _hasConnectivity;
|
||||
_hasConnectivity = connected;
|
||||
|
||||
if (!wasConnected && connected) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Connectivity restored, recovering streams',
|
||||
);
|
||||
_recoverActiveStreams();
|
||||
} else if (wasConnected && !connected) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Connectivity lost, suspending streams',
|
||||
);
|
||||
_suspendAllStreams();
|
||||
}
|
||||
}
|
||||
|
||||
void _startHeartbeat() {
|
||||
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
|
||||
if (_activeStreams.isNotEmpty && _isInBackground) {
|
||||
_backgroundHandler.keepAlive();
|
||||
|
||||
// Check for stale streams during background operation
|
||||
_checkStreamHealth();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _checkStreamHealth() {
|
||||
final now = DateTime.now();
|
||||
final staleStreams = <String>[];
|
||||
|
||||
for (final entry in _streamMetadata.entries) {
|
||||
final streamId = entry.key;
|
||||
final metadata = entry.value;
|
||||
final lastUpdate = metadata['lastUpdate'] as DateTime?;
|
||||
|
||||
if (lastUpdate != null) {
|
||||
final timeSinceUpdate = now.difference(lastUpdate);
|
||||
|
||||
// If no update in 90 seconds while in background, consider stale
|
||||
if (timeSinceUpdate > const Duration(seconds: 90)) {
|
||||
DebugLogger.warning(
|
||||
'Stream $streamId appears stale: ${timeSinceUpdate.inSeconds}s since last update',
|
||||
);
|
||||
staleStreams.add(streamId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt recovery for stale streams
|
||||
for (final streamId in staleStreams) {
|
||||
final callback = _streamRecoveryCallbacks[streamId];
|
||||
if (callback != null && _retryAttempts[streamId] == null) {
|
||||
DebugLogger.stream('Initiating recovery for stale stream: $streamId');
|
||||
_attemptStreamRecovery(streamId, callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void didChangeAppLifecycleState(AppLifecycleState state) {
|
||||
// _lastLifecycleState = state; // Removed as it's unused
|
||||
|
||||
switch (state) {
|
||||
case AppLifecycleState.paused:
|
||||
case AppLifecycleState.inactive:
|
||||
_onAppBackground();
|
||||
break;
|
||||
case AppLifecycleState.resumed:
|
||||
_onAppForeground();
|
||||
break;
|
||||
case AppLifecycleState.detached:
|
||||
case AppLifecycleState.hidden:
|
||||
// Handle app termination
|
||||
_onAppDetached();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void _onAppBackground() {
|
||||
DebugLogger.stream('PersistentStreamingService: App went to background');
|
||||
_isInBackground = true;
|
||||
|
||||
// Enable wake lock to prevent device sleep during streaming
|
||||
if (_activeStreams.isNotEmpty) {
|
||||
_enableWakeLock();
|
||||
_startBackgroundExecution();
|
||||
}
|
||||
}
|
||||
|
||||
void _onAppForeground() {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreamingService: App returned to foreground',
|
||||
);
|
||||
_isInBackground = false;
|
||||
|
||||
// Cancel background timer
|
||||
_backgroundTimer?.cancel();
|
||||
_backgroundTimer = null;
|
||||
|
||||
// Disable wake lock if no active streams
|
||||
if (_activeStreams.isEmpty) {
|
||||
_disableWakeLock();
|
||||
}
|
||||
|
||||
// Close any controllers for streams that were suspended in background
|
||||
// This allows the onComplete handlers to fire now that we're in foreground
|
||||
final suspendedStreams = _streamMetadata.entries
|
||||
.where((e) => e.value['suspended'] == true)
|
||||
.map((e) => e.key)
|
||||
.toList();
|
||||
|
||||
for (final streamId in suspendedStreams) {
|
||||
final controller = _streamControllers[streamId];
|
||||
if (controller != null && !controller.isClosed) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreamingService: Closing suspended stream $streamId controller in foreground',
|
||||
);
|
||||
controller.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Check and recover any interrupted streams
|
||||
_recoverActiveStreams();
|
||||
}
|
||||
|
||||
void _onAppDetached() {
|
||||
DebugLogger.stream('PersistentStreamingService: App detached');
|
||||
|
||||
// Save stream states for recovery
|
||||
_saveStreamStatesForRecovery();
|
||||
|
||||
// Clean up
|
||||
_backgroundTimer?.cancel();
|
||||
_heartbeatTimer?.cancel();
|
||||
_disableWakeLock();
|
||||
}
|
||||
|
||||
// Register a stream for persistent handling
|
||||
String registerStream({
|
||||
required StreamSubscription subscription,
|
||||
required StreamController controller,
|
||||
Function? recoveryCallback,
|
||||
Map<String, dynamic>? metadata,
|
||||
}) {
|
||||
final streamId = DateTime.now().millisecondsSinceEpoch.toString();
|
||||
|
||||
_activeStreams[streamId] = subscription;
|
||||
_streamControllers[streamId] = controller;
|
||||
if (recoveryCallback != null) {
|
||||
_streamRecoveryCallbacks[streamId] = recoveryCallback;
|
||||
}
|
||||
|
||||
// Store metadata for recovery
|
||||
if (metadata != null) {
|
||||
_streamMetadata[streamId] = metadata;
|
||||
|
||||
// Register with background handler
|
||||
_backgroundHandler.registerStream(
|
||||
streamId,
|
||||
conversationId: metadata['conversationId'] ?? '',
|
||||
messageId: metadata['messageId'] ?? '',
|
||||
sessionId: metadata['sessionId'],
|
||||
lastChunkSequence: metadata['lastChunkSequence'],
|
||||
lastContent: metadata['lastContent'],
|
||||
);
|
||||
}
|
||||
|
||||
// Enable wake lock when streaming starts
|
||||
if (_activeStreams.length == 1) {
|
||||
_enableWakeLock();
|
||||
}
|
||||
|
||||
// Start background execution if app is backgrounded
|
||||
if (_isInBackground) {
|
||||
_startBackgroundExecution();
|
||||
}
|
||||
|
||||
DebugLogger.stream(
|
||||
'PersistentStreamingService: Registered stream $streamId',
|
||||
);
|
||||
|
||||
return streamId;
|
||||
}
|
||||
|
||||
// Unregister a stream
|
||||
void unregisterStream(String streamId, {bool saveForRecovery = false}) {
|
||||
// If app is in background and stream is unregistering, it might be due to
|
||||
// network interruption - save state for recovery instead of just dropping it
|
||||
if (_isInBackground &&
|
||||
!saveForRecovery &&
|
||||
_streamMetadata.containsKey(streamId)) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreamingService: Stream $streamId interrupted in background, saving for recovery',
|
||||
);
|
||||
// Don't unregister yet - keep it for recovery
|
||||
_markStreamAsSuspended(streamId);
|
||||
return;
|
||||
}
|
||||
|
||||
_activeStreams.remove(streamId);
|
||||
_streamControllers.remove(streamId);
|
||||
_streamRecoveryCallbacks.remove(streamId);
|
||||
_streamMetadata.remove(streamId);
|
||||
_retryAttempts.remove(streamId);
|
||||
|
||||
// Unregister from background handler
|
||||
_backgroundHandler.unregisterStream(streamId);
|
||||
|
||||
// Stop background execution if no more streams
|
||||
if (_activeStreams.isEmpty) {
|
||||
_backgroundHandler.stopBackgroundExecution([streamId]);
|
||||
_disableWakeLock();
|
||||
}
|
||||
|
||||
DebugLogger.stream(
|
||||
'PersistentStreamingService: Unregistered stream $streamId',
|
||||
);
|
||||
}
|
||||
|
||||
// Check if a stream is still active
|
||||
bool isStreamActive(String streamId) {
|
||||
return _activeStreams.containsKey(streamId);
|
||||
}
|
||||
|
||||
// Recover interrupted streams
|
||||
Future<void> _recoverActiveStreams() async {
|
||||
if (!_hasConnectivity) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: No connectivity, skipping recovery',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// First, try to recover from background handler saved states
|
||||
final savedStates = await _backgroundHandler.recoverStreamStates();
|
||||
for (final state in savedStates) {
|
||||
if (!state.isStale()) {
|
||||
await _recoverStreamFromState(state);
|
||||
}
|
||||
}
|
||||
|
||||
// Then check active streams for recovery
|
||||
for (final entry in _streamRecoveryCallbacks.entries) {
|
||||
final streamId = entry.key;
|
||||
final recoveryCallback = entry.value;
|
||||
|
||||
// Check if stream was interrupted or needs recovery
|
||||
final subscription = _activeStreams[streamId];
|
||||
if (subscription == null || _needsRecovery(streamId)) {
|
||||
await _attemptStreamRecovery(streamId, recoveryCallback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _recoverStreamFromState(StreamState state) async {
|
||||
final recoveryCallback = _streamRecoveryCallbacks[state.streamId];
|
||||
if (recoveryCallback != null) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Recovering stream from saved state: ${state.streamId}',
|
||||
);
|
||||
await _attemptStreamRecovery(state.streamId, recoveryCallback);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _attemptStreamRecovery(
|
||||
String streamId,
|
||||
Function recoveryCallback,
|
||||
) async {
|
||||
final attempts = _retryAttempts[streamId] ?? 0;
|
||||
if (attempts >= _maxRetryAttempts) {
|
||||
DebugLogger.warning(
|
||||
'PersistentStreaming: Max retry attempts reached for stream $streamId',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Recovering stream $streamId (attempt ${attempts + 1})',
|
||||
);
|
||||
|
||||
try {
|
||||
_retryAttempts[streamId] = attempts + 1;
|
||||
|
||||
// Add exponential backoff delay
|
||||
if (attempts > 0) {
|
||||
final delay = _retryDelay * (1 << (attempts - 1)); // 2s, 4s, 8s...
|
||||
await Future.delayed(delay);
|
||||
}
|
||||
|
||||
// Call recovery callback to restart the stream
|
||||
await recoveryCallback();
|
||||
|
||||
// Reset retry count on success
|
||||
_retryAttempts.remove(streamId);
|
||||
} catch (e) {
|
||||
DebugLogger.error(
|
||||
'recover-failed',
|
||||
scope: 'streaming/persistent',
|
||||
error: e,
|
||||
data: {'streamId': streamId},
|
||||
);
|
||||
|
||||
// Schedule next retry if under limit
|
||||
if (_retryAttempts[streamId]! < _maxRetryAttempts) {
|
||||
Timer(
|
||||
_retryDelay,
|
||||
() => _attemptStreamRecovery(streamId, recoveryCallback),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool _needsRecovery(String streamId) {
|
||||
final metadata = _streamMetadata[streamId];
|
||||
if (metadata == null) return false;
|
||||
|
||||
// Check if stream is marked as suspended
|
||||
if (metadata['suspended'] == true) {
|
||||
final suspendedAt = metadata['suspendedAt'] as DateTime?;
|
||||
if (suspendedAt != null) {
|
||||
final timeSinceSuspend = DateTime.now().difference(suspendedAt);
|
||||
// Try to recover suspended streams after 10 seconds
|
||||
return timeSinceSuspend > const Duration(seconds: 10);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if stream has been inactive for too long
|
||||
final lastUpdate = metadata['lastUpdate'] as DateTime?;
|
||||
if (lastUpdate != null) {
|
||||
final timeSinceUpdate = DateTime.now().difference(lastUpdate);
|
||||
// In background: 90 seconds
|
||||
// In foreground: 2 minutes
|
||||
final threshold = _isInBackground
|
||||
? const Duration(seconds: 90)
|
||||
: const Duration(minutes: 2);
|
||||
return timeSinceUpdate > threshold;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Platform-specific background execution
|
||||
void _startBackgroundExecution() {
|
||||
if (_activeStreams.isNotEmpty) {
|
||||
_backgroundHandler.startBackgroundExecution(_activeStreams.keys.toList());
|
||||
}
|
||||
}
|
||||
|
||||
void _markStreamAsSuspended(String streamId) {
|
||||
final metadata = _streamMetadata[streamId];
|
||||
if (metadata != null) {
|
||||
metadata['suspended'] = true;
|
||||
metadata['suspendedAt'] = DateTime.now();
|
||||
}
|
||||
}
|
||||
|
||||
void _suspendAllStreams() {
|
||||
for (final streamId in _activeStreams.keys) {
|
||||
_markStreamAsSuspended(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
void _saveStreamStatesForRecovery() {
|
||||
if (_activeStreams.isEmpty) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: No active streams to save for recovery',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery',
|
||||
);
|
||||
|
||||
// Actually save the stream states through the background handler
|
||||
final streamIds = _activeStreams.keys.toList();
|
||||
_backgroundHandler.saveStreamStatesForRecovery(streamIds, 'app_detached');
|
||||
}
|
||||
|
||||
// Update stream metadata when chunks are received
|
||||
void updateStreamProgress(
|
||||
String streamId, {
|
||||
int? chunkSequence,
|
||||
String? content,
|
||||
String? appendedContent,
|
||||
}) {
|
||||
// Update background handler state
|
||||
_backgroundHandler.updateStreamState(
|
||||
streamId,
|
||||
chunkSequence: chunkSequence,
|
||||
content: content,
|
||||
appendedContent: appendedContent,
|
||||
);
|
||||
|
||||
// Update local metadata
|
||||
final metadata = _streamMetadata[streamId];
|
||||
if (metadata != null) {
|
||||
metadata['lastUpdate'] = DateTime.now();
|
||||
metadata['lastChunkSequence'] =
|
||||
chunkSequence ?? metadata['lastChunkSequence'];
|
||||
if (appendedContent != null) {
|
||||
metadata['lastContent'] =
|
||||
(metadata['lastContent'] ?? '') + appendedContent;
|
||||
} else if (content != null) {
|
||||
metadata['lastContent'] = content;
|
||||
}
|
||||
metadata['suspended'] = false; // Mark as active
|
||||
}
|
||||
}
|
||||
|
||||
// Wake lock management
|
||||
void _enableWakeLock() async {
|
||||
try {
|
||||
await WakelockPlus.enable();
|
||||
DebugLogger.stream('wake-lock-enabled', scope: 'streaming/persistent');
|
||||
} catch (e) {
|
||||
DebugLogger.error(
|
||||
'wake-lock-enable-failed',
|
||||
scope: 'streaming/persistent',
|
||||
error: e,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
void _disableWakeLock() async {
|
||||
try {
|
||||
await WakelockPlus.disable();
|
||||
DebugLogger.stream('wake-lock-disabled', scope: 'streaming/persistent');
|
||||
} catch (e) {
|
||||
DebugLogger.error(
|
||||
'wake-lock-disable-failed',
|
||||
scope: 'streaming/persistent',
|
||||
error: e,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Get active stream count
|
||||
int get activeStreamCount => _activeStreams.length;
|
||||
|
||||
// Check if app is in background
|
||||
bool get isInBackground => _isInBackground;
|
||||
|
||||
// Get stream metadata
|
||||
Map<String, dynamic>? getStreamMetadata(String streamId) {
|
||||
return _streamMetadata[streamId];
|
||||
}
|
||||
|
||||
// Check if stream is suspended
|
||||
bool isStreamSuspended(String streamId) {
|
||||
final metadata = _streamMetadata[streamId];
|
||||
return metadata?['suspended'] == true;
|
||||
}
|
||||
|
||||
// Force recovery of a specific stream
|
||||
Future<void> forceRecoverStream(String streamId) async {
|
||||
final recoveryCallback = _streamRecoveryCallbacks[streamId];
|
||||
if (recoveryCallback != null) {
|
||||
_retryAttempts.remove(streamId); // Reset retry count
|
||||
await _attemptStreamRecovery(streamId, recoveryCallback);
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
void dispose() {
|
||||
WidgetsBinding.instance.removeObserver(this);
|
||||
_backgroundTimer?.cancel();
|
||||
_heartbeatTimer?.cancel();
|
||||
_connectivitySubscription?.cancel();
|
||||
_disableWakeLock();
|
||||
|
||||
// Stop all background execution
|
||||
if (_activeStreams.isNotEmpty) {
|
||||
_backgroundHandler.stopBackgroundExecution(_activeStreams.keys.toList());
|
||||
}
|
||||
|
||||
// Cancel all active streams
|
||||
for (final subscription in _activeStreams.values) {
|
||||
subscription.cancel();
|
||||
}
|
||||
_activeStreams.clear();
|
||||
|
||||
// Close all controllers
|
||||
for (final controller in _streamControllers.values) {
|
||||
if (!controller.isClosed) {
|
||||
controller.close();
|
||||
}
|
||||
}
|
||||
_streamControllers.clear();
|
||||
|
||||
// Clear all metadata
|
||||
_streamMetadata.clear();
|
||||
_streamRecoveryCallbacks.clear();
|
||||
_retryAttempts.clear();
|
||||
|
||||
// Clear background handler
|
||||
_backgroundHandler.clearAll();
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:flutter/widgets.dart';
|
||||
import 'package:socket_io_client/socket_io_client.dart' as io;
|
||||
|
||||
|
||||
@@ -1,202 +0,0 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'package:dio/dio.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
|
||||
/// Parser for Server-Sent Events (SSE) streaming responses.
|
||||
///
|
||||
/// This matches the web client's EventSourceParserStream behavior,
|
||||
/// parsing SSE data chunks and extracting OpenAI-compatible deltas.
|
||||
class SSEStreamParser {
|
||||
/// Parse an SSE response stream from Dio into text chunks.
|
||||
///
|
||||
/// Returns a stream of content strings extracted from OpenAI-style
|
||||
/// completion chunks.
|
||||
///
|
||||
/// [heartbeatTimeout] - Maximum time without data before considering
|
||||
/// the connection stale (default: 2 minutes)
|
||||
/// [onHeartbeat] - Callback invoked when any data is received
|
||||
static Stream<String> parseResponseStream(
|
||||
ResponseBody responseBody, {
|
||||
bool splitLargeDeltas = false,
|
||||
Duration heartbeatTimeout = const Duration(minutes: 2),
|
||||
void Function()? onHeartbeat,
|
||||
}) async* {
|
||||
DateTime lastDataReceived = DateTime.now();
|
||||
Timer? heartbeatTimer;
|
||||
|
||||
// Set up heartbeat monitoring
|
||||
if (heartbeatTimeout.inMilliseconds > 0) {
|
||||
heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
|
||||
final timeSinceLastData = DateTime.now().difference(lastDataReceived);
|
||||
if (timeSinceLastData > heartbeatTimeout) {
|
||||
DebugLogger.warning(
|
||||
'SSE stream heartbeat timeout: No data received for ${timeSinceLastData.inSeconds}s',
|
||||
data: {'timeout': heartbeatTimeout.inSeconds},
|
||||
);
|
||||
timer.cancel();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
// Buffer for accumulating incomplete SSE messages
|
||||
String buffer = '';
|
||||
|
||||
await for (final chunk in responseBody.stream) {
|
||||
// Update last data timestamp and invoke heartbeat callback
|
||||
lastDataReceived = DateTime.now();
|
||||
onHeartbeat?.call();
|
||||
|
||||
// Convert bytes to string (Dio ResponseBody.stream always emits Uint8List)
|
||||
final text = utf8.decode(chunk as List<int>, allowMalformed: true);
|
||||
buffer += text;
|
||||
|
||||
// Process complete SSE messages (delimited by double newline)
|
||||
final messages = buffer.split('\n\n');
|
||||
|
||||
// Keep the last (potentially incomplete) message in the buffer
|
||||
buffer = messages.removeLast();
|
||||
|
||||
for (final message in messages) {
|
||||
if (message.trim().isEmpty) continue;
|
||||
|
||||
// Parse SSE message
|
||||
final content = _parseSSEMessage(message);
|
||||
if (content != null) {
|
||||
if (content == '[DONE]') {
|
||||
// Stream completion signal
|
||||
DebugLogger.stream('SSE stream completed with [DONE] signal');
|
||||
return;
|
||||
}
|
||||
|
||||
// Split large deltas into smaller chunks for smoother UI updates
|
||||
if (splitLargeDeltas && content.length > 5) {
|
||||
yield* _splitIntoChunks(content);
|
||||
} else {
|
||||
yield content;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining buffered data
|
||||
if (buffer.trim().isNotEmpty) {
|
||||
final content = _parseSSEMessage(buffer);
|
||||
if (content != null && content != '[DONE]') {
|
||||
yield content;
|
||||
}
|
||||
}
|
||||
} catch (e, stackTrace) {
|
||||
DebugLogger.error(
|
||||
'sse-parse-error',
|
||||
scope: 'streaming/sse',
|
||||
error: e,
|
||||
stackTrace: stackTrace,
|
||||
);
|
||||
rethrow;
|
||||
} finally {
|
||||
// Clean up heartbeat timer
|
||||
heartbeatTimer?.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a single SSE message and extract content.
|
||||
static String? _parseSSEMessage(String message) {
|
||||
try {
|
||||
// SSE format: "data: <json>\n" or just the JSON
|
||||
String dataLine = message.trim();
|
||||
|
||||
// Remove "data: " prefix if present
|
||||
if (dataLine.startsWith('data: ')) {
|
||||
dataLine = dataLine.substring(6).trim();
|
||||
} else if (dataLine.startsWith('data:')) {
|
||||
dataLine = dataLine.substring(5).trim();
|
||||
}
|
||||
|
||||
// Handle [DONE] signal
|
||||
if (dataLine == '[DONE]' || dataLine == 'DONE') {
|
||||
return '[DONE]';
|
||||
}
|
||||
|
||||
// Skip empty data
|
||||
if (dataLine.isEmpty) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Parse JSON
|
||||
try {
|
||||
final json = jsonDecode(dataLine) as Map<String, dynamic>;
|
||||
|
||||
// Handle errors
|
||||
if (json['error'] != null) {
|
||||
DebugLogger.error(
|
||||
'sse-error-response',
|
||||
scope: 'streaming/sse',
|
||||
error: json['error'],
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Extract content from OpenAI-style response
|
||||
// Format: { choices: [{ delta: { content: "..." } }] }
|
||||
final choices = json['choices'];
|
||||
if (choices is List && choices.isNotEmpty) {
|
||||
final choice = choices.first as Map<String, dynamic>?;
|
||||
if (choice != null) {
|
||||
final delta = choice['delta'] as Map<String, dynamic>?;
|
||||
if (delta != null) {
|
||||
final content = delta['content'];
|
||||
if (content is String && content.isNotEmpty) {
|
||||
return content;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Alternative format: { content: "..." }
|
||||
final directContent = json['content'];
|
||||
if (directContent is String && directContent.isNotEmpty) {
|
||||
return directContent;
|
||||
}
|
||||
|
||||
return null;
|
||||
} on FormatException catch (e) {
|
||||
DebugLogger.warning(
|
||||
'Failed to parse SSE JSON: $dataLine',
|
||||
data: {'error': e.toString()},
|
||||
);
|
||||
return null;
|
||||
}
|
||||
} catch (e) {
|
||||
DebugLogger.error(
|
||||
'sse-message-parse-error',
|
||||
scope: 'streaming/sse',
|
||||
error: e,
|
||||
data: {'message': message},
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/// Split large content into smaller chunks for smoother streaming.
|
||||
/// This matches the web client's streamLargeDeltasAsRandomChunks behavior.
|
||||
static Stream<String> _splitIntoChunks(String content) async* {
|
||||
var remaining = content;
|
||||
|
||||
while (remaining.isNotEmpty) {
|
||||
// Random chunk size between 1-3 characters
|
||||
final chunkSize = (remaining.length < 3)
|
||||
? remaining.length
|
||||
: 1 + (DateTime.now().millisecond % 3);
|
||||
|
||||
final chunk = remaining.substring(0, chunkSize);
|
||||
yield chunk;
|
||||
|
||||
// Small delay for smoother visual effect (matching web client)
|
||||
await Future.delayed(const Duration(milliseconds: 5));
|
||||
|
||||
remaining = remaining.substring(chunkSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,6 @@ import 'package:flutter/material.dart';
|
||||
|
||||
import '../../core/models/chat_message.dart';
|
||||
import '../../core/models/socket_event.dart';
|
||||
import '../../core/services/persistent_streaming_service.dart';
|
||||
import '../../core/services/socket_service.dart';
|
||||
import '../../core/utils/inactivity_watchdog.dart';
|
||||
import '../../core/utils/tool_calls_parser.dart';
|
||||
@@ -163,86 +162,44 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
required void Function() finishStreaming,
|
||||
required List<ChatMessage> Function() getMessages,
|
||||
}) {
|
||||
// Persistable controller to survive brief app suspensions
|
||||
// Track if streaming has been finished to avoid duplicate cleanup
|
||||
bool hasFinished = false;
|
||||
|
||||
// Wrap finishStreaming to always clear the cancel token
|
||||
void wrappedFinishStreaming() {
|
||||
if (hasFinished) return;
|
||||
hasFinished = true;
|
||||
api.clearStreamCancelToken(assistantMessageId);
|
||||
finishStreaming();
|
||||
}
|
||||
|
||||
// Controller for forwarding data to StreamingResponseController
|
||||
// With WebSocket-only streaming, the HTTP stream closes immediately after returning task_id.
|
||||
// All actual content comes via WebSocket events, so we don't need persistent stream tracking.
|
||||
final persistentController = StreamController<String>.broadcast();
|
||||
final persistentService = PersistentStreamingService();
|
||||
|
||||
// Track if stream has received any data
|
||||
bool hasReceivedData = false;
|
||||
|
||||
// Create subscription first so we can reference it in onDone
|
||||
late final String streamId;
|
||||
final subscription = stream.listen(
|
||||
// Subscribe to HTTP stream (mainly for error handling - content comes via WebSocket)
|
||||
final httpSubscription = stream.listen(
|
||||
(data) {
|
||||
hasReceivedData = true;
|
||||
// Forward any HTTP stream data (rare with WebSocket-only)
|
||||
persistentController.add(data);
|
||||
},
|
||||
onDone: () async {
|
||||
onDone: () {
|
||||
DebugLogger.stream(
|
||||
'Source stream onDone fired, hasReceivedData=$hasReceivedData',
|
||||
);
|
||||
|
||||
// If stream closes immediately without data, it's likely due to backgrounding/network drop
|
||||
// Not a natural completion
|
||||
if (!hasReceivedData) {
|
||||
DebugLogger.stream(
|
||||
'Stream closed without data - likely interrupted, not completing',
|
||||
);
|
||||
// Check if app is backgrounding - if so, finish streaming with whatever we have
|
||||
await Future.delayed(const Duration(milliseconds: 300));
|
||||
if (persistentService.isInBackground) {
|
||||
DebugLogger.stream(
|
||||
'App backgrounding during stream - finishing with current content',
|
||||
);
|
||||
finishStreaming();
|
||||
}
|
||||
// Don't close the controller to prevent cascading completion handlers
|
||||
return;
|
||||
}
|
||||
|
||||
// For streams with data, delay to allow background detection
|
||||
await Future.delayed(const Duration(milliseconds: 500));
|
||||
|
||||
final isInBg = persistentService.isInBackground;
|
||||
DebugLogger.stream(
|
||||
'Stream onDone check: streamId=$streamId, isInBackground=$isInBg',
|
||||
);
|
||||
|
||||
// Check if we're in background before closing
|
||||
if (!isInBg) {
|
||||
DebugLogger.stream(
|
||||
'Closing stream controller for $streamId (foreground completion)',
|
||||
'HTTP stream completed - WebSocket handles content delivery',
|
||||
);
|
||||
// Close the controller to trigger StreamingResponseController.onComplete
|
||||
// WebSocket events continue independently via socket subscriptions
|
||||
if (!persistentController.isClosed) {
|
||||
persistentController.close();
|
||||
} else {
|
||||
DebugLogger.stream(
|
||||
'Source stream completed in background for $streamId - keeping open for recovery',
|
||||
);
|
||||
// Finish streaming to save the content we have
|
||||
finishStreaming();
|
||||
}
|
||||
},
|
||||
onError: persistentController.addError,
|
||||
);
|
||||
|
||||
streamId = persistentService.registerStream(
|
||||
subscription: subscription,
|
||||
controller: persistentController,
|
||||
recoveryCallback: () async {
|
||||
DebugLogger.log(
|
||||
'Attempting to recover interrupted stream',
|
||||
scope: 'streaming/helper',
|
||||
);
|
||||
},
|
||||
metadata: {
|
||||
'conversationId': activeConversationId,
|
||||
'messageId': assistantMessageId,
|
||||
'modelId': modelId,
|
||||
},
|
||||
);
|
||||
api.registerPersistentStreamForMessage(assistantMessageId, streamId);
|
||||
|
||||
InactivityWatchdog? socketWatchdog;
|
||||
// Socket subscriptions list - starts empty so non-socket flows can finish via onComplete.
|
||||
// HTTP subscription is tracked separately and cleaned up in disposeSocketSubscriptions.
|
||||
final socketSubscriptions = <VoidCallback>[];
|
||||
final hasSocketSignals =
|
||||
socketService != null || registerDeltaListener != null;
|
||||
@@ -268,7 +225,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
if (msgs.isNotEmpty &&
|
||||
msgs.last.role == 'assistant' &&
|
||||
msgs.last.isStreaming) {
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
}
|
||||
} catch (_) {}
|
||||
socketWatchdog?.stop();
|
||||
@@ -284,15 +241,19 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
int imageCollectionRequestId = 0;
|
||||
|
||||
void disposeSocketSubscriptions() {
|
||||
if (socketSubscriptions.isEmpty) {
|
||||
return;
|
||||
}
|
||||
// Cancel HTTP subscription
|
||||
try {
|
||||
httpSubscription.cancel();
|
||||
} catch (_) {}
|
||||
|
||||
// Cancel socket subscriptions
|
||||
for (final dispose in socketSubscriptions) {
|
||||
try {
|
||||
dispose();
|
||||
} catch (_) {}
|
||||
}
|
||||
socketSubscriptions.clear();
|
||||
|
||||
imageCollectionDebounce?.cancel();
|
||||
imageCollectionDebounce = null;
|
||||
pendingImageContent = null;
|
||||
@@ -481,7 +442,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
sessionId: sessionId,
|
||||
);
|
||||
} catch (_) {}
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
socketWatchdog?.stop();
|
||||
return;
|
||||
}
|
||||
@@ -502,7 +463,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
sessionId: sessionId,
|
||||
);
|
||||
} catch (_) {}
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
socketWatchdog?.stop();
|
||||
return;
|
||||
}
|
||||
@@ -563,7 +524,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
try {
|
||||
socketService?.offEvent(channel);
|
||||
} catch (_) {}
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
socketWatchdog?.stop();
|
||||
return;
|
||||
}
|
||||
@@ -781,13 +742,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
}
|
||||
} catch (_) {
|
||||
} finally {
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
socketWatchdog?.stop();
|
||||
}
|
||||
}
|
||||
@@ -816,7 +777,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
});
|
||||
}
|
||||
disposeSocketSubscriptions();
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
} else if (type == 'chat:message:follow_ups' && payload != null) {
|
||||
DebugLogger.log('Received follow-ups event', scope: 'streaming/helper');
|
||||
final followMap = _asStringMap(payload);
|
||||
@@ -966,7 +927,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
return message.copyWith(statusHistory: filtered);
|
||||
});
|
||||
// Ensure UI exits streaming state
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
socketWatchdog?.stop();
|
||||
} else if ((type == 'chat:message:delta' || type == 'message') &&
|
||||
payload != null) {
|
||||
@@ -1246,17 +1207,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
}
|
||||
},
|
||||
onComplete: () {
|
||||
api.clearPersistentStreamForMessage(
|
||||
assistantMessageId,
|
||||
expectedStreamId: streamId,
|
||||
);
|
||||
// Unregister from persistent service
|
||||
persistentService.unregisterStream(streamId);
|
||||
|
||||
// Stream completion without socket subscriptions indicates a simple flow
|
||||
// For WebSocket flows, completion should be handled by socket events (done: true)
|
||||
// HTTP stream completed - cleanup already done in onDone handler.
|
||||
// For WebSocket flows, actual completion is handled by socket events (done: true).
|
||||
// Only finish streaming here if there are no socket subscriptions (simple/legacy flow).
|
||||
if (socketSubscriptions.isEmpty) {
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
Future.microtask(refreshConversationSnapshot);
|
||||
}
|
||||
},
|
||||
@@ -1272,14 +1227,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
},
|
||||
);
|
||||
|
||||
api.clearPersistentStreamForMessage(
|
||||
assistantMessageId,
|
||||
expectedStreamId: streamId,
|
||||
);
|
||||
try {
|
||||
persistentService.unregisterStream(streamId);
|
||||
} catch (_) {}
|
||||
|
||||
// Check if this is a recoverable error (network issues, etc.)
|
||||
final errorText = error.toString();
|
||||
final isRecoverable =
|
||||
@@ -1303,7 +1250,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
}
|
||||
|
||||
disposeSocketSubscriptions();
|
||||
finishStreaming();
|
||||
wrappedFinishStreaming();
|
||||
Future.microtask(refreshConversationSnapshot);
|
||||
socketWatchdog?.stop();
|
||||
},
|
||||
|
||||
@@ -17,7 +17,7 @@ typedef StreamingErrorCallback =
|
||||
///
|
||||
/// This wraps a [StreamSubscription], normalises error handling, and exposes
|
||||
/// a unified cancel method so UI layers can stop streaming without having to
|
||||
/// know the underlying transport (SSE, polling, etc.).
|
||||
/// know the underlying transport (WebSocket, polling, etc.).
|
||||
class StreamingResponseController {
|
||||
StreamingResponseController({
|
||||
required Stream<String> stream,
|
||||
|
||||
@@ -34,6 +34,15 @@ final chatMessagesProvider =
|
||||
ChatMessagesNotifier.new,
|
||||
);
|
||||
|
||||
/// Whether chat is currently streaming a response.
|
||||
/// Used by router to avoid showing connection issues during active streaming.
|
||||
final isChatStreamingProvider = Provider<bool>((ref) {
|
||||
final messages = ref.watch(chatMessagesProvider);
|
||||
if (messages.isEmpty) return false;
|
||||
final last = messages.last;
|
||||
return last.role == 'assistant' && last.isStreaming;
|
||||
});
|
||||
|
||||
// Loading state for conversation (used to show chat skeletons during fetch)
|
||||
@Riverpod(keepAlive: true)
|
||||
class IsLoadingConversation extends _$IsLoadingConversation {
|
||||
@@ -1381,23 +1390,44 @@ Future<void> regenerateMessage(
|
||||
'tags': <dynamic>[],
|
||||
};
|
||||
|
||||
// Socket binding for background flows
|
||||
// WebSocket-only streaming requires socket connection
|
||||
final socketService = ref.read(socketServiceProvider);
|
||||
String? socketSessionId = socketService?.sessionId;
|
||||
bool wantSessionBinding =
|
||||
(socketService?.isConnected == true) &&
|
||||
if (socketService == null) {
|
||||
// No socket service available
|
||||
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((
|
||||
m,
|
||||
) {
|
||||
return m.copyWith(
|
||||
content: 'Connection not available. Please try again later.',
|
||||
isStreaming: false,
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure socket is connected (with 10s timeout)
|
||||
if (!socketService.isConnected) {
|
||||
final connected = await socketService.ensureConnected(
|
||||
timeout: const Duration(seconds: 10),
|
||||
);
|
||||
if (!connected) {
|
||||
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((
|
||||
m,
|
||||
) {
|
||||
return m.copyWith(
|
||||
content:
|
||||
'Unable to connect to server. Please check your connection and try again.',
|
||||
isStreaming: false,
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
final socketSessionId = socketService.sessionId;
|
||||
final bool wantSessionBinding =
|
||||
socketService.isConnected &&
|
||||
(socketSessionId != null && socketSessionId.isNotEmpty);
|
||||
// When regenerating with tools, make a best-effort to ensure a live socket.
|
||||
if (!wantSessionBinding && socketService != null) {
|
||||
try {
|
||||
final ok = await socketService.ensureConnected();
|
||||
if (ok) {
|
||||
socketSessionId = socketService.sessionId;
|
||||
wantSessionBinding =
|
||||
socketSessionId != null && socketSessionId.isNotEmpty;
|
||||
}
|
||||
} catch (_) {}
|
||||
}
|
||||
|
||||
// Resolve tool servers from user settings (if any)
|
||||
List<Map<String, dynamic>>? toolServers;
|
||||
@@ -1963,12 +1993,46 @@ Future<void> _sendMessageInternal(
|
||||
'tags': <dynamic>[],
|
||||
};
|
||||
|
||||
// Stream response using server-push via Socket when available, otherwise fallback
|
||||
// Resolve Socket session for background tasks parity
|
||||
// WebSocket-only streaming requires socket connection.
|
||||
// Wait for connection with timeout before proceeding.
|
||||
final socketService = ref.read(socketServiceProvider);
|
||||
final socketSessionId = socketService?.sessionId;
|
||||
if (socketService == null) {
|
||||
// No socket service available at all
|
||||
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((
|
||||
m,
|
||||
) {
|
||||
return m.copyWith(
|
||||
content: 'Connection not available. Please try again later.',
|
||||
isStreaming: false,
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure socket is connected (with 10s timeout for initial connection)
|
||||
if (!socketService.isConnected) {
|
||||
final connected = await socketService.ensureConnected(
|
||||
timeout: const Duration(seconds: 10),
|
||||
);
|
||||
if (!connected) {
|
||||
// Socket connection failed - cannot stream without it
|
||||
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((
|
||||
m,
|
||||
) {
|
||||
return m.copyWith(
|
||||
content:
|
||||
'Unable to connect to server. Please check your connection and try again.',
|
||||
isStreaming: false,
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Socket is now connected - resolve session for background tasks parity
|
||||
final socketSessionId = socketService.sessionId;
|
||||
final bool wantSessionBinding =
|
||||
(socketService?.isConnected == true) &&
|
||||
socketService.isConnected &&
|
||||
(socketSessionId != null && socketSessionId.isNotEmpty);
|
||||
|
||||
// Resolve tool servers from user settings (if any)
|
||||
@@ -2045,7 +2109,7 @@ Future<void> _sendMessageInternal(
|
||||
final effectiveSessionId =
|
||||
response.socketSessionId ?? socketSessionId ?? sessionId;
|
||||
|
||||
// Use unified streaming helper for SSE/WebSocket handling
|
||||
// Use unified streaming helper for WebSocket handling
|
||||
final bool isBackgroundFlow = response.isBackgroundFlow;
|
||||
|
||||
try {
|
||||
@@ -2500,7 +2564,7 @@ final stopGenerationProvider = Provider<void Function()>((ref) {
|
||||
messages.last.isStreaming) {
|
||||
final lastId = messages.last.id;
|
||||
|
||||
// Cancel the network stream (SSE) if active
|
||||
// Cancel the network stream if active
|
||||
final api = ref.read(apiServiceProvider);
|
||||
api?.cancelStreamingMessage(lastId);
|
||||
|
||||
|
||||
@@ -1254,7 +1254,7 @@ class _ChatPageState extends ConsumerState<ChatPage> {
|
||||
|
||||
try {
|
||||
// If assistant message has generated images and it's the last message,
|
||||
// use image-only regenerate flow instead of text SSE regeneration
|
||||
// use image-only regenerate flow instead of text streaming regeneration
|
||||
if (message.role == 'assistant' &&
|
||||
(message.files?.any((f) => f['type'] == 'image') == true) &&
|
||||
messageIndex == messages.length - 1) {
|
||||
|
||||
Reference in New Issue
Block a user