From a8ae2644f5b65e2a391327391c78f77c579fb565 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sun, 5 Oct 2025 23:16:44 +0530 Subject: [PATCH] refactor: improve background streaming service and error handling - Updated BackgroundStreamingService to start in the foreground immediately to prevent timeout issues, with a placeholder notification. - Enhanced error handling in BackgroundStreamingHandler to catch exceptions when starting the foreground service, ensuring active streams are cleared if the service fails to start. - Refactored saveStreamStatesForRecovery method to improve logging and clarity in stream state management. - Added checks to close suspended stream controllers when transitioning to the foreground, enhancing resource management. --- .../conduit/BackgroundStreamingHandler.kt | 35 ++++++++---- .../background_streaming_handler.dart | 24 +++++++-- .../persistent_streaming_service.dart | 45 +++++++++++++++- lib/core/services/streaming_helper.dart | 54 ++++++++++++++++--- 4 files changed, 136 insertions(+), 22 deletions(-) diff --git a/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt b/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt index 4a96c32..9b8ce36 100644 --- a/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt +++ b/android/app/src/main/kotlin/app/cogwheel/conduit/BackgroundStreamingHandler.kt @@ -31,15 +31,22 @@ class BackgroundStreamingService : Service() { override fun onCreate() { super.onCreate() - println("BackgroundStreamingService: Service created") + // Immediately start foreground to prevent timeout - will update with proper notification later + startForeground(NOTIFICATION_ID, createNotification(1)) + println("BackgroundStreamingService: Service created with foreground notification") } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { + // Always ensure we're foreground first to prevent timeout exceptions + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { + startForegroundWithNotification(1) + } + when (intent?.action) { ACTION_START -> { val streamCount = intent.getIntExtra("streamCount", 1) acquireWakeLock() - startForegroundWithNotification(streamCount) + updateNotification(streamCount) println("BackgroundStreamingService: Started foreground service for $streamCount streams") } ACTION_STOP -> { @@ -51,7 +58,7 @@ class BackgroundStreamingService : Service() { updateNotification(streamCount) } } - + return START_STICKY // Restart if killed by system } @@ -239,14 +246,20 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal } private fun startForegroundService() { - val serviceIntent = Intent(context, BackgroundStreamingService::class.java) - serviceIntent.putExtra("streamCount", activeStreams.size) - serviceIntent.action = BackgroundStreamingService.ACTION_START - - if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { - context.startForegroundService(serviceIntent) - } else { - context.startService(serviceIntent) + try { + val serviceIntent = Intent(context, BackgroundStreamingService::class.java) + serviceIntent.putExtra("streamCount", activeStreams.size) + serviceIntent.action = BackgroundStreamingService.ACTION_START + + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { + context.startForegroundService(serviceIntent) + } else { + context.startService(serviceIntent) + } + } catch (e: Exception) { + println("BackgroundStreamingHandler: Failed to start foreground service: ${e.message}") + // Clear active streams as we couldn't start the service + activeStreams.clear() } } diff --git a/lib/core/services/background_streaming_handler.dart b/lib/core/services/background_streaming_handler.dart index 296e23f..7670a44 100644 --- a/lib/core/services/background_streaming_handler.dart +++ b/lib/core/services/background_streaming_handler.dart @@ -49,7 +49,7 @@ class BackgroundStreamingHandler { onStreamsSuspending?.call(streamIds); // Save stream states for recovery - await _saveStreamStatesForRecovery(streamIds, reason); + await saveStreamStatesForRecovery(streamIds, reason); break; case 'backgroundTaskExpiring': @@ -188,7 +188,8 @@ class BackgroundStreamingHandler { final recovered = []; for (final stateData in states) { - final map = stateData as Map; + // Platform channels return Map, need to convert + final map = Map.from(stateData as Map); final state = StreamState.fromMap(map); if (state != null) { recovered.add(state); @@ -209,21 +210,38 @@ class BackgroundStreamingHandler { } /// Save stream states for recovery after app restart - Future _saveStreamStatesForRecovery( + Future saveStreamStatesForRecovery( List streamIds, String reason, ) async { + DebugLogger.stream( + 'saveStreamStatesForRecovery called', + scope: 'background', + data: {'streamIds': streamIds, 'reason': reason, 'statesCount': _streamStates.length}, + ); + final statesToSave = streamIds .map((id) => _streamStates[id]) .where((state) => state != null) .map((state) => state!.toMap()) .toList(); + DebugLogger.stream( + 'statesToSave prepared', + scope: 'background', + data: {'count': statesToSave.length}, + ); + try { await _channel.invokeMethod('saveStreamStates', { 'states': statesToSave, 'reason': reason, }); + DebugLogger.stream( + 'save-states-success', + scope: 'background', + data: {'count': statesToSave.length, 'reason': reason}, + ); } catch (e) { DebugLogger.error( 'save-states-failed', diff --git a/lib/core/services/persistent_streaming_service.dart b/lib/core/services/persistent_streaming_service.dart index 5d6545a..e746139 100644 --- a/lib/core/services/persistent_streaming_service.dart +++ b/lib/core/services/persistent_streaming_service.dart @@ -150,6 +150,23 @@ class PersistentStreamingService with WidgetsBindingObserver { _disableWakeLock(); } + // Close any controllers for streams that were suspended in background + // This allows the onComplete handlers to fire now that we're in foreground + final suspendedStreams = _streamMetadata.entries + .where((e) => e.value['suspended'] == true) + .map((e) => e.key) + .toList(); + + for (final streamId in suspendedStreams) { + final controller = _streamControllers[streamId]; + if (controller != null && !controller.isClosed) { + DebugLogger.stream( + 'PersistentStreamingService: Closing suspended stream $streamId controller in foreground', + ); + controller.close(); + } + } + // Check and recover any interrupted streams _recoverActiveStreams(); } @@ -214,7 +231,18 @@ class PersistentStreamingService with WidgetsBindingObserver { } // Unregister a stream - void unregisterStream(String streamId) { + void unregisterStream(String streamId, {bool saveForRecovery = false}) { + // If app is in background and stream is unregistering, it might be due to + // network interruption - save state for recovery instead of just dropping it + if (_isInBackground && !saveForRecovery && _streamMetadata.containsKey(streamId)) { + DebugLogger.stream( + 'PersistentStreamingService: Stream $streamId interrupted in background, saving for recovery', + ); + // Don't unregister yet - keep it for recovery + _markStreamAsSuspended(streamId); + return; + } + _activeStreams.remove(streamId); _streamControllers.remove(streamId); _streamRecoveryCallbacks.remove(streamId); @@ -366,10 +394,20 @@ class PersistentStreamingService with WidgetsBindingObserver { } void _saveStreamStatesForRecovery() { - // The background handler will handle the actual saving + if (_activeStreams.isEmpty) { + DebugLogger.stream( + 'PersistentStreaming: No active streams to save for recovery', + ); + return; + } + DebugLogger.stream( 'PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery', ); + + // Actually save the stream states through the background handler + final streamIds = _activeStreams.keys.toList(); + _backgroundHandler.saveStreamStatesForRecovery(streamIds, 'app_detached'); } // Update stream metadata when chunks are received @@ -433,6 +471,9 @@ class PersistentStreamingService with WidgetsBindingObserver { // Get active stream count int get activeStreamCount => _activeStreams.length; + // Check if app is in background + bool get isInBackground => _isInBackground; + // Get stream metadata Map? getStreamMetadata(String streamId) { return _streamMetadata[streamId]; diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 585dffe..a5b61b1 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -74,12 +74,54 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ final persistentController = StreamController.broadcast(); final persistentService = PersistentStreamingService(); - final streamId = persistentService.registerStream( - subscription: stream.listen( - persistentController.add, - onDone: persistentController.close, - onError: persistentController.addError, - ), + // Track if stream has received any data + bool hasReceivedData = false; + + // Create subscription first so we can reference it in onDone + late final String streamId; + final subscription = stream.listen( + (data) { + hasReceivedData = true; + persistentController.add(data); + }, + onDone: () async { + DebugLogger.stream('Source stream onDone fired, hasReceivedData=$hasReceivedData'); + + // If stream closes immediately without data, it's likely due to backgrounding/network drop + // Not a natural completion + if (!hasReceivedData) { + DebugLogger.stream('Stream closed without data - likely interrupted, not completing'); + // Check if app is backgrounding - if so, finish streaming with whatever we have + await Future.delayed(const Duration(milliseconds: 300)); + if (persistentService.isInBackground) { + DebugLogger.stream('App backgrounding during stream - finishing with current content'); + finishStreaming(); + } + // Don't close the controller to prevent cascading completion handlers + return; + } + + // For streams with data, delay to allow background detection + await Future.delayed(const Duration(milliseconds: 500)); + + final isInBg = persistentService.isInBackground; + DebugLogger.stream('Stream onDone check: streamId=$streamId, isInBackground=$isInBg'); + + // Check if we're in background before closing + if (!isInBg) { + DebugLogger.stream('Closing stream controller for $streamId (foreground completion)'); + persistentController.close(); + } else { + DebugLogger.stream('Source stream completed in background for $streamId - keeping open for recovery'); + // Finish streaming to save the content we have + finishStreaming(); + } + }, + onError: persistentController.addError, + ); + + streamId = persistentService.registerStream( + subscription: subscription, controller: persistentController, recoveryCallback: () async { DebugLogger.log(