From 7fb199b2e486898016b90d33f568fbceb838185c Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Tue, 28 Oct 2025 13:59:17 +0530 Subject: [PATCH] feat: implement service failure handling in background streaming - Added a method to send failure notifications to Flutter when the background service fails to enter the foreground. - Implemented a broadcast receiver to handle service failure notifications and notify Flutter about the failure. - Enhanced the persistent streaming service to attempt recovery for failed streams. - Introduced heartbeat monitoring for SSE streams to detect stale connections and trigger recovery actions. --- .../conduit/BackgroundStreamingHandler.kt | 53 +++++++++++++ lib/core/services/api_service.dart | 67 +++++++++++++--- .../background_streaming_handler.dart | 29 ++++++- .../persistent_streaming_service.dart | 78 ++++++++++++++++++- lib/core/services/sse_stream_parser.dart | 33 ++++++++ lib/core/services/streaming_helper.dart | 24 ++++-- .../chat/providers/chat_providers.dart | 6 +- 7 files changed, 265 insertions(+), 25 deletions(-) diff --git a/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt b/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt index cc5f820..18ed17d 100644 --- a/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt +++ b/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt @@ -102,9 +102,19 @@ class BackgroundStreamingService : Service() { } catch (e: Exception) { // Catch all exceptions including ForegroundServiceStartNotAllowedException println("BackgroundStreamingService: Failed to enter foreground: ${e.javaClass.simpleName}: ${e.message}") + // Notify Flutter about the failure + sendFailureNotification(e) false } } + + private fun sendFailureNotification(e: Exception) { + // Send broadcast intent to notify MainActivity + val intent = Intent("app.cogwheel.conduit.FOREGROUND_SERVICE_FAILED") + intent.putExtra("error", e.message ?: "Unknown error") + intent.putExtra("errorType", e.javaClass.simpleName) + sendBroadcast(intent) + } private fun updateForegroundType(notification: Notification, type: Int) { if (Build.VERSION.SDK_INT < Build.VERSION_CODES.Q) return @@ -248,6 +258,7 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal private val streamsRequiringMic = mutableSetOf() private var backgroundJob: Job? = null private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob()) + private var serviceFailureReceiver: android.content.BroadcastReceiver? = null companion object { private const val CHANNEL_NAME = "conduit/background_streaming" @@ -262,6 +273,38 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal sharedPrefs = context.getSharedPreferences(PREFS_NAME, Context.MODE_PRIVATE) createNotificationChannel() + setupServiceFailureReceiver() + } + + private fun setupServiceFailureReceiver() { + serviceFailureReceiver = object : android.content.BroadcastReceiver() { + override fun onReceive(context: Context?, intent: Intent?) { + if (intent?.action == "app.cogwheel.conduit.FOREGROUND_SERVICE_FAILED") { + val error = intent.getStringExtra("error") ?: "Unknown error" + val errorType = intent.getStringExtra("errorType") ?: "Exception" + + println("BackgroundStreamingHandler: Service failure received: $errorType - $error") + + // Notify Flutter about the service failure + channel.invokeMethod("serviceFailed", mapOf( + "error" to error, + "errorType" to errorType, + "streamIds" to activeStreams.toList() + )) + + // Clear active streams since service failed + activeStreams.clear() + streamsRequiringMic.clear() + } + } + } + + val filter = android.content.IntentFilter("app.cogwheel.conduit.FOREGROUND_SERVICE_FAILED") + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) { + context.registerReceiver(serviceFailureReceiver, filter, Context.RECEIVER_NOT_EXPORTED) + } else { + context.registerReceiver(serviceFailureReceiver, filter) + } } override fun onMethodCall(call: MethodCall, result: Result) { @@ -514,5 +557,15 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal scope.cancel() stopBackgroundMonitoring() stopForegroundService() + + // Unregister broadcast receiver + try { + serviceFailureReceiver?.let { + context.unregisterReceiver(it) + } + } catch (e: Exception) { + println("BackgroundStreamingHandler: Error unregistering receiver: ${e.message}") + } + serviceFailureReceiver = null } } diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index e42649f..eca3729 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -3067,6 +3067,29 @@ class ApiService { final Map _streamCancelTokens = {}; final Map _messagePersistentStreamIds = {}; + /// Associates a streaming message with its persistent stream identifier. + void registerPersistentStreamForMessage(String messageId, String streamId) { + _messagePersistentStreamIds[messageId] = streamId; + } + + /// Removes the persistent stream mapping for a message if it matches. + /// + /// Returns the removed persistent stream identifier when one existed and + /// matched the optional [expectedStreamId]. + String? clearPersistentStreamForMessage( + String messageId, { + String? expectedStreamId, + }) { + final current = _messagePersistentStreamIds[messageId]; + if (current == null) { + return null; + } + if (expectedStreamId != null && current != expectedStreamId) { + return null; + } + return _messagePersistentStreamIds.remove(messageId); + } + // Send message using dual-stream approach (HTTP SSE + WebSocket events). // Matches OpenWebUI web client behavior: // - HTTP SSE stream provides immediate content chunks @@ -3205,15 +3228,19 @@ class ApiService { try { final userParams = userSettings?['params'] as Map?; final functionCallingMode = userParams?['function_calling'] as String?; - + if (functionCallingMode != null) { final params = (data['params'] as Map?) ?? {}; params['function_calling'] = functionCallingMode; data['params'] = params; - _traceApi('Set params.function_calling = $functionCallingMode (from user settings)'); + _traceApi( + 'Set params.function_calling = $functionCallingMode (from user settings)', + ); } else { - _traceApi('No function_calling preference in user settings, backend will use default mode'); + _traceApi( + 'No function_calling preference in user settings, backend will use default mode', + ); } } catch (_) { // Non-fatal; continue without setting function_calling mode @@ -3288,20 +3315,29 @@ class ApiService { data: data, options: Options( responseType: ResponseType.stream, + // Extended timeout for streaming responses - allow up to 10 minutes + // for long-running tool calls and reasoning + receiveTimeout: const Duration(minutes: 10), + // Shorter send timeout for the initial request + sendTimeout: const Duration(seconds: 30), headers: { 'Accept': 'text/event-stream', + // Enable HTTP keep-alive to maintain connection in background + 'Connection': 'keep-alive', + // Request server to send keep-alive messages + 'Cache-Control': 'no-cache', }, ), cancelToken: cancelToken, ); final respData = resp.data; - + // Check if we got a task_id response (non-streaming) if (respData is Map && respData['task_id'] != null) { final taskId = respData['task_id'].toString(); _traceApi('Background task created: $taskId'); - + // In this case, all streaming will happen via WebSocket // Close HTTP stream but keep WebSocket active if (!streamController.isClosed) { @@ -3309,15 +3345,26 @@ class ApiService { } return; } - + // We have a streaming response body if (respData is ResponseBody) { _traceApi('HTTP SSE stream started for message: $messageId'); - + // Parse SSE stream and forward chunks to controller await for (final chunk in SSEStreamParser.parseResponseStream( respData, splitLargeDeltas: false, + heartbeatTimeout: const Duration(minutes: 2), + onHeartbeat: () { + // Notify persistent streaming service that connection is alive + final persistentStreamId = _messagePersistentStreamIds[messageId]; + if (persistentStreamId != null) { + PersistentStreamingService().updateStreamProgress( + persistentStreamId, + chunkSequence: DateTime.now().millisecondsSinceEpoch, + ); + } + }, )) { if (!streamController.isClosed) { streamController.add(chunk); @@ -3326,12 +3373,12 @@ class ApiService { break; } } - + _traceApi('HTTP SSE stream completed for message: $messageId'); } else { _traceApi('Unexpected response type: ${respData.runtimeType}'); } - + // Close the HTTP stream controller // WebSocket events will continue independently via streaming_helper if (!streamController.isClosed) { @@ -3399,7 +3446,7 @@ class ApiService { } catch (_) {} try { - final pid = _messagePersistentStreamIds.remove(messageId); + final pid = clearPersistentStreamForMessage(messageId); if (pid != null) { PersistentStreamingService().unregisterStream(pid); } diff --git a/lib/core/services/background_streaming_handler.dart b/lib/core/services/background_streaming_handler.dart index 57ea40f..47d33c6 100644 --- a/lib/core/services/background_streaming_handler.dart +++ b/lib/core/services/background_streaming_handler.dart @@ -27,9 +27,11 @@ class BackgroundStreamingHandler { void Function(List streamIds)? onStreamsSuspending; void Function()? onBackgroundTaskExpiring; void Function(List streamIds, int estimatedSeconds)? - onBackgroundTaskExtended; + onBackgroundTaskExtended; void Function()? onBackgroundKeepAlive; bool Function()? shouldContinueInBackground; + void Function(String error, String errorType, List streamIds)? + onServiceFailed; void _setupMethodCallHandler() { _channel.setMethodCallHandler((call) async { @@ -79,6 +81,31 @@ class BackgroundStreamingHandler { DebugLogger.stream('keepalive-signal', scope: 'background'); onBackgroundKeepAlive?.call(); break; + + case 'serviceFailed': + final Map args = + call.arguments as Map; + final String error = args['error'] as String? ?? 'Unknown error'; + final String errorType = args['errorType'] as String? ?? 'Exception'; + final List streamIds = + (args['streamIds'] as List?)?.cast() ?? []; + + DebugLogger.error( + 'service-failed', + scope: 'background', + error: error, + data: {'type': errorType, 'streams': streamIds.length}, + ); + + // Notify callback about service failure + onServiceFailed?.call(error, errorType, streamIds); + + // Clean up failed streams + for (final streamId in streamIds) { + _activeStreamIds.remove(streamId); + _streamStates.remove(streamId); + } + break; } }); } diff --git a/lib/core/services/persistent_streaming_service.dart b/lib/core/services/persistent_streaming_service.dart index b7ec066..73c9f3f 100644 --- a/lib/core/services/persistent_streaming_service.dart +++ b/lib/core/services/persistent_streaming_service.dart @@ -46,6 +46,28 @@ class PersistentStreamingService with WidgetsBindingObserver { } void _setupBackgroundHandlerCallbacks() { + _backgroundHandler.onServiceFailed = (error, errorType, streamIds) { + DebugLogger.error( + 'background-service-failed', + scope: 'streaming/persistent', + error: '$errorType: $error', + data: {'affectedStreams': streamIds}, + ); + + // Attempt immediate recovery for failed streams + for (final streamId in streamIds) { + final callback = _streamRecoveryCallbacks[streamId]; + if (callback != null) { + // Schedule recovery after a short delay + Future.delayed(const Duration(seconds: 2), () { + if (_activeStreams.containsKey(streamId)) { + _attemptStreamRecovery(streamId, callback); + } + }); + } + } + }; + _backgroundHandler.onStreamsSuspending = (streamIds) { DebugLogger.stream( 'PersistentStreaming: Streams suspending - $streamIds', @@ -123,9 +145,46 @@ class PersistentStreamingService with WidgetsBindingObserver { _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) { if (_activeStreams.isNotEmpty && _isInBackground) { _backgroundHandler.keepAlive(); + + // Check for stale streams during background operation + _checkStreamHealth(); } }); } + + void _checkStreamHealth() { + final now = DateTime.now(); + final staleStreams = []; + + for (final entry in _streamMetadata.entries) { + final streamId = entry.key; + final metadata = entry.value; + final lastUpdate = metadata['lastUpdate'] as DateTime?; + + if (lastUpdate != null) { + final timeSinceUpdate = now.difference(lastUpdate); + + // If no update in 90 seconds while in background, consider stale + if (timeSinceUpdate > const Duration(seconds: 90)) { + DebugLogger.warning( + 'Stream $streamId appears stale: ${timeSinceUpdate.inSeconds}s since last update', + ); + staleStreams.add(streamId); + } + } + } + + // Attempt recovery for stale streams + for (final streamId in staleStreams) { + final callback = _streamRecoveryCallbacks[streamId]; + if (callback != null && _retryAttempts[streamId] == null) { + DebugLogger.stream( + 'Initiating recovery for stale stream: $streamId', + ); + _attemptStreamRecovery(streamId, callback); + } + } + } @override void didChangeAppLifecycleState(AppLifecycleState state) { @@ -385,13 +444,26 @@ class PersistentStreamingService with WidgetsBindingObserver { final metadata = _streamMetadata[streamId]; if (metadata == null) return false; + // Check if stream is marked as suspended + if (metadata['suspended'] == true) { + final suspendedAt = metadata['suspendedAt'] as DateTime?; + if (suspendedAt != null) { + final timeSinceSuspend = DateTime.now().difference(suspendedAt); + // Try to recover suspended streams after 10 seconds + return timeSinceSuspend > const Duration(seconds: 10); + } + } + // Check if stream has been inactive for too long final lastUpdate = metadata['lastUpdate'] as DateTime?; if (lastUpdate != null) { final timeSinceUpdate = DateTime.now().difference(lastUpdate); - // Align with app-side watchdogs: be less aggressive than UI guard - // but still attempt recovery before server timeouts become likely. - return timeSinceUpdate > const Duration(minutes: 2); + // In background: 90 seconds + // In foreground: 2 minutes + final threshold = _isInBackground + ? const Duration(seconds: 90) + : const Duration(minutes: 2); + return timeSinceUpdate > threshold; } return false; diff --git a/lib/core/services/sse_stream_parser.dart b/lib/core/services/sse_stream_parser.dart index 4a16b67..f71cd99 100644 --- a/lib/core/services/sse_stream_parser.dart +++ b/lib/core/services/sse_stream_parser.dart @@ -12,15 +12,45 @@ class SSEStreamParser { /// /// Returns a stream of content strings extracted from OpenAI-style /// completion chunks. + /// + /// [heartbeatTimeout] - Maximum time without data before considering + /// the connection stale (default: 2 minutes) + /// [onHeartbeat] - Callback invoked when any data is received static Stream parseResponseStream( ResponseBody responseBody, { bool splitLargeDeltas = false, + Duration heartbeatTimeout = const Duration(minutes: 2), + void Function()? onHeartbeat, }) async* { + DateTime lastDataReceived = DateTime.now(); + Timer? heartbeatTimer; + + // Set up heartbeat monitoring + if (heartbeatTimeout.inMilliseconds > 0) { + heartbeatTimer = Timer.periodic( + const Duration(seconds: 30), + (timer) { + final timeSinceLastData = DateTime.now().difference(lastDataReceived); + if (timeSinceLastData > heartbeatTimeout) { + DebugLogger.warning( + 'SSE stream heartbeat timeout: No data received for ${timeSinceLastData.inSeconds}s', + data: {'timeout': heartbeatTimeout.inSeconds}, + ); + timer.cancel(); + } + }, + ); + } + try { // Buffer for accumulating incomplete SSE messages String buffer = ''; await for (final chunk in responseBody.stream) { + // Update last data timestamp and invoke heartbeat callback + lastDataReceived = DateTime.now(); + onHeartbeat?.call(); + // Convert bytes to string (Dio ResponseBody.stream always emits Uint8List) final text = utf8.decode(chunk as List, allowMalformed: true); buffer += text; @@ -68,6 +98,9 @@ class SSEStreamParser { stackTrace: stackTrace, ); rethrow; + } finally { + // Clean up heartbeat timer + heartbeatTimer?.cancel(); } } diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 54d7e36..b023918 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -16,6 +16,7 @@ import '../../shared/theme/theme_extensions.dart'; import '../utils/debug_logger.dart'; import '../utils/openwebui_source_parser.dart'; import 'streaming_response_controller.dart'; +import 'api_service.dart'; // Keep local verbosity toggle for socket logs const bool kSocketVerboseLogging = false; @@ -67,7 +68,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ required Map modelItem, required String sessionId, required String? activeConversationId, - required dynamic api, + required ApiService api, required SocketService? socketService, RegisterConversationDeltaListener? registerDeltaListener, // Message update callbacks @@ -169,6 +170,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ 'modelId': modelId, }, ); + api.registerPersistentStreamForMessage(assistantMessageId, streamId); InactivityWatchdog? socketWatchdog; final socketSubscriptions = []; @@ -318,8 +320,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ if (chatId == null || chatId.isEmpty) { return; } - if (api == null) return; - refreshingSnapshot = true; try { final conversation = await api.getConversation(chatId); @@ -376,7 +376,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { // Fire and forget // ignore: unawaited_futures - api?.sendChatCompleted( + api.sendChatCompleted( chatId: activeConversationId ?? '', messageId: assistantMessageId, messages: const [], @@ -397,7 +397,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } catch (_) {} try { // ignore: unawaited_futures - api?.sendChatCompleted( + api.sendChatCompleted( chatId: activeConversationId ?? '', messageId: assistantMessageId, messages: const [], @@ -594,7 +594,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ if (payload['done'] == true) { try { // ignore: unawaited_futures - api?.sendChatCompleted( + api.sendChatCompleted( chatId: activeConversationId ?? '', messageId: assistantMessageId, messages: const [], @@ -614,8 +614,8 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { final chatId = activeConversationId; if (chatId != null && chatId.isNotEmpty) { - final resp = await api?.dio.get('/api/v1/chats/$chatId'); - final data = resp?.data as Map?; + final resp = await api.dio.get('/api/v1/chats/$chatId'); + final data = resp.data as Map?; String content = ''; final chatObj = data?['chat'] as Map?; if (chatObj != null) { @@ -1137,6 +1137,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } }, onComplete: () { + api.clearPersistentStreamForMessage( + assistantMessageId, + expectedStreamId: streamId, + ); // Unregister from persistent service persistentService.unregisterStream(streamId); @@ -1159,6 +1163,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ }, ); + api.clearPersistentStreamForMessage( + assistantMessageId, + expectedStreamId: streamId, + ); try { persistentService.unregisterStream(streamId); } catch (_) {} diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index aa19ad8..35e4bbd 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1447,7 +1447,7 @@ Future regenerateMessage( modelItem: modelItem, sessionId: effectiveSessionId, activeConversationId: activeConversation.id, - api: api, + api: api!, socketService: socketService, registerDeltaListener: registerDeltaListener, appendToLastMessage: (c) => @@ -1719,7 +1719,7 @@ Future _sendMessageInternal( final List ids = msg.attachmentIds ?? const []; if (ids.isNotEmpty) { final messageMap = await _buildMessagePayloadWithAttachments( - api: api, + api: api!, role: msg.role, cleanedText: cleaned, attachmentIds: ids, @@ -1995,7 +1995,7 @@ Future _sendMessageInternal( modelItem: modelItem, sessionId: effectiveSessionId, activeConversationId: activeConversation?.id, - api: api, + api: api!, socketService: socketService, registerDeltaListener: registerDeltaListener, appendToLastMessage: (c) =>