refactor: improve background streaming service and error handling

- Updated BackgroundStreamingService to start in the foreground immediately to prevent timeout issues, with a placeholder notification.
- Enhanced error handling in BackgroundStreamingHandler to catch exceptions when starting the foreground service, ensuring active streams are cleared if the service fails to start.
- Refactored saveStreamStatesForRecovery method to improve logging and clarity in stream state management.
- Added checks to close suspended stream controllers when transitioning to the foreground, enhancing resource management.
This commit is contained in:
cogwheel0
2025-10-05 23:16:44 +05:30
parent 8b06d5a179
commit a8ae2644f5
4 changed files with 136 additions and 22 deletions

View File

@@ -31,15 +31,22 @@ class BackgroundStreamingService : Service() {
override fun onCreate() { override fun onCreate() {
super.onCreate() super.onCreate()
println("BackgroundStreamingService: Service created") // Immediately start foreground to prevent timeout - will update with proper notification later
startForeground(NOTIFICATION_ID, createNotification(1))
println("BackgroundStreamingService: Service created with foreground notification")
} }
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
// Always ensure we're foreground first to prevent timeout exceptions
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
startForegroundWithNotification(1)
}
when (intent?.action) { when (intent?.action) {
ACTION_START -> { ACTION_START -> {
val streamCount = intent.getIntExtra("streamCount", 1) val streamCount = intent.getIntExtra("streamCount", 1)
acquireWakeLock() acquireWakeLock()
startForegroundWithNotification(streamCount) updateNotification(streamCount)
println("BackgroundStreamingService: Started foreground service for $streamCount streams") println("BackgroundStreamingService: Started foreground service for $streamCount streams")
} }
ACTION_STOP -> { ACTION_STOP -> {
@@ -239,14 +246,20 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal
} }
private fun startForegroundService() { private fun startForegroundService() {
val serviceIntent = Intent(context, BackgroundStreamingService::class.java) try {
serviceIntent.putExtra("streamCount", activeStreams.size) val serviceIntent = Intent(context, BackgroundStreamingService::class.java)
serviceIntent.action = BackgroundStreamingService.ACTION_START serviceIntent.putExtra("streamCount", activeStreams.size)
serviceIntent.action = BackgroundStreamingService.ACTION_START
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
context.startForegroundService(serviceIntent) context.startForegroundService(serviceIntent)
} else { } else {
context.startService(serviceIntent) context.startService(serviceIntent)
}
} catch (e: Exception) {
println("BackgroundStreamingHandler: Failed to start foreground service: ${e.message}")
// Clear active streams as we couldn't start the service
activeStreams.clear()
} }
} }

View File

@@ -49,7 +49,7 @@ class BackgroundStreamingHandler {
onStreamsSuspending?.call(streamIds); onStreamsSuspending?.call(streamIds);
// Save stream states for recovery // Save stream states for recovery
await _saveStreamStatesForRecovery(streamIds, reason); await saveStreamStatesForRecovery(streamIds, reason);
break; break;
case 'backgroundTaskExpiring': case 'backgroundTaskExpiring':
@@ -188,7 +188,8 @@ class BackgroundStreamingHandler {
final recovered = <StreamState>[]; final recovered = <StreamState>[];
for (final stateData in states) { for (final stateData in states) {
final map = stateData as Map<String, dynamic>; // Platform channels return Map<Object?, Object?>, need to convert
final map = Map<String, dynamic>.from(stateData as Map);
final state = StreamState.fromMap(map); final state = StreamState.fromMap(map);
if (state != null) { if (state != null) {
recovered.add(state); recovered.add(state);
@@ -209,21 +210,38 @@ class BackgroundStreamingHandler {
} }
/// Save stream states for recovery after app restart /// Save stream states for recovery after app restart
Future<void> _saveStreamStatesForRecovery( Future<void> saveStreamStatesForRecovery(
List<String> streamIds, List<String> streamIds,
String reason, String reason,
) async { ) async {
DebugLogger.stream(
'saveStreamStatesForRecovery called',
scope: 'background',
data: {'streamIds': streamIds, 'reason': reason, 'statesCount': _streamStates.length},
);
final statesToSave = streamIds final statesToSave = streamIds
.map((id) => _streamStates[id]) .map((id) => _streamStates[id])
.where((state) => state != null) .where((state) => state != null)
.map((state) => state!.toMap()) .map((state) => state!.toMap())
.toList(); .toList();
DebugLogger.stream(
'statesToSave prepared',
scope: 'background',
data: {'count': statesToSave.length},
);
try { try {
await _channel.invokeMethod('saveStreamStates', { await _channel.invokeMethod('saveStreamStates', {
'states': statesToSave, 'states': statesToSave,
'reason': reason, 'reason': reason,
}); });
DebugLogger.stream(
'save-states-success',
scope: 'background',
data: {'count': statesToSave.length, 'reason': reason},
);
} catch (e) { } catch (e) {
DebugLogger.error( DebugLogger.error(
'save-states-failed', 'save-states-failed',

View File

@@ -150,6 +150,23 @@ class PersistentStreamingService with WidgetsBindingObserver {
_disableWakeLock(); _disableWakeLock();
} }
// Close any controllers for streams that were suspended in background
// This allows the onComplete handlers to fire now that we're in foreground
final suspendedStreams = _streamMetadata.entries
.where((e) => e.value['suspended'] == true)
.map((e) => e.key)
.toList();
for (final streamId in suspendedStreams) {
final controller = _streamControllers[streamId];
if (controller != null && !controller.isClosed) {
DebugLogger.stream(
'PersistentStreamingService: Closing suspended stream $streamId controller in foreground',
);
controller.close();
}
}
// Check and recover any interrupted streams // Check and recover any interrupted streams
_recoverActiveStreams(); _recoverActiveStreams();
} }
@@ -214,7 +231,18 @@ class PersistentStreamingService with WidgetsBindingObserver {
} }
// Unregister a stream // Unregister a stream
void unregisterStream(String streamId) { void unregisterStream(String streamId, {bool saveForRecovery = false}) {
// If app is in background and stream is unregistering, it might be due to
// network interruption - save state for recovery instead of just dropping it
if (_isInBackground && !saveForRecovery && _streamMetadata.containsKey(streamId)) {
DebugLogger.stream(
'PersistentStreamingService: Stream $streamId interrupted in background, saving for recovery',
);
// Don't unregister yet - keep it for recovery
_markStreamAsSuspended(streamId);
return;
}
_activeStreams.remove(streamId); _activeStreams.remove(streamId);
_streamControllers.remove(streamId); _streamControllers.remove(streamId);
_streamRecoveryCallbacks.remove(streamId); _streamRecoveryCallbacks.remove(streamId);
@@ -366,10 +394,20 @@ class PersistentStreamingService with WidgetsBindingObserver {
} }
void _saveStreamStatesForRecovery() { void _saveStreamStatesForRecovery() {
// The background handler will handle the actual saving if (_activeStreams.isEmpty) {
DebugLogger.stream(
'PersistentStreaming: No active streams to save for recovery',
);
return;
}
DebugLogger.stream( DebugLogger.stream(
'PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery', 'PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery',
); );
// Actually save the stream states through the background handler
final streamIds = _activeStreams.keys.toList();
_backgroundHandler.saveStreamStatesForRecovery(streamIds, 'app_detached');
} }
// Update stream metadata when chunks are received // Update stream metadata when chunks are received
@@ -433,6 +471,9 @@ class PersistentStreamingService with WidgetsBindingObserver {
// Get active stream count // Get active stream count
int get activeStreamCount => _activeStreams.length; int get activeStreamCount => _activeStreams.length;
// Check if app is in background
bool get isInBackground => _isInBackground;
// Get stream metadata // Get stream metadata
Map<String, dynamic>? getStreamMetadata(String streamId) { Map<String, dynamic>? getStreamMetadata(String streamId) {
return _streamMetadata[streamId]; return _streamMetadata[streamId];

View File

@@ -74,12 +74,54 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
final persistentController = StreamController<String>.broadcast(); final persistentController = StreamController<String>.broadcast();
final persistentService = PersistentStreamingService(); final persistentService = PersistentStreamingService();
final streamId = persistentService.registerStream( // Track if stream has received any data
subscription: stream.listen( bool hasReceivedData = false;
persistentController.add,
onDone: persistentController.close, // Create subscription first so we can reference it in onDone
onError: persistentController.addError, late final String streamId;
), final subscription = stream.listen(
(data) {
hasReceivedData = true;
persistentController.add(data);
},
onDone: () async {
DebugLogger.stream('Source stream onDone fired, hasReceivedData=$hasReceivedData');
// If stream closes immediately without data, it's likely due to backgrounding/network drop
// Not a natural completion
if (!hasReceivedData) {
DebugLogger.stream('Stream closed without data - likely interrupted, not completing');
// Check if app is backgrounding - if so, finish streaming with whatever we have
await Future.delayed(const Duration(milliseconds: 300));
if (persistentService.isInBackground) {
DebugLogger.stream('App backgrounding during stream - finishing with current content');
finishStreaming();
}
// Don't close the controller to prevent cascading completion handlers
return;
}
// For streams with data, delay to allow background detection
await Future.delayed(const Duration(milliseconds: 500));
final isInBg = persistentService.isInBackground;
DebugLogger.stream('Stream onDone check: streamId=$streamId, isInBackground=$isInBg');
// Check if we're in background before closing
if (!isInBg) {
DebugLogger.stream('Closing stream controller for $streamId (foreground completion)');
persistentController.close();
} else {
DebugLogger.stream('Source stream completed in background for $streamId - keeping open for recovery');
// Finish streaming to save the content we have
finishStreaming();
}
},
onError: persistentController.addError,
);
streamId = persistentService.registerStream(
subscription: subscription,
controller: persistentController, controller: persistentController,
recoveryCallback: () async { recoveryCallback: () async {
DebugLogger.log( DebugLogger.log(