fix(background): Improve background streaming reliability and error handling

feat(background): Add initialization and error handling for background streaming
feat(background): Improve background streaming reliability and error handling
feat(background): Improve iOS background task and stream management
refactor(android): Remove unused stream state persistence logic
feat(android): Improve wake lock and broadcast receiver handling
This commit is contained in:
cogwheel
2025-12-20 22:10:28 +05:30
parent 671b953f23
commit 6a07855c9b
6 changed files with 672 additions and 414 deletions

View File

@@ -3,10 +3,45 @@ import 'dart:io';
import 'package:flutter/services.dart';
import '../utils/debug_logger.dart';
/// Handles background streaming continuation for iOS and Android
/// Handles background streaming continuation for iOS and Android.
///
/// On iOS: Uses beginBackgroundTask (~30s) + BGTaskScheduler (~3+ minutes)
/// On Android: Uses foreground service notifications
/// This service keeps the app alive when streaming content in the background,
/// ensuring that chat responses, voice calls, and socket connections continue
/// even when the app is not in the foreground.
///
/// ## Platform Implementations
///
/// ### iOS
/// - Uses `beginBackgroundTask` for ~30 seconds of execution
/// - Uses `BGProcessingTask` for extended time (~1-3 minutes when granted)
/// - **Limitation**: iOS may not grant extended time; streams may be interrupted
/// - Audio mode (`UIBackgroundModes: audio`) provides reliable background for voice calls
///
/// ### Android
/// - Uses foreground service with notification (reliable, can run for hours)
/// - Acquires wake lock to prevent CPU sleep during active streaming
/// - **Android 14+**: dataSync services limited to 6 hours (we stop at 5h with warning)
///
/// ## Usage
///
/// For most streaming operations, only [startBackgroundExecution] and
/// [stopBackgroundExecution] are needed:
///
/// ```dart
/// // When streaming starts
/// await BackgroundStreamingHandler.instance.startBackgroundExecution(['stream-123']);
///
/// // When streaming completes
/// await BackgroundStreamingHandler.instance.stopBackgroundExecution(['stream-123']);
/// ```
///
/// For extended background sessions (e.g., voice calls), call [keepAlive] periodically:
///
/// ```dart
/// Timer.periodic(Duration(minutes: 5), (_) {
/// BackgroundStreamingHandler.instance.keepAlive();
/// });
/// ```
class BackgroundStreamingHandler {
static const MethodChannel _channel = MethodChannel(
'conduit/background_streaming',
@@ -25,7 +60,41 @@ class BackgroundStreamingHandler {
}
final Set<String> _activeStreamIds = <String>{};
final Map<String, StreamState> _streamStates = <String, StreamState>{};
final Set<String> _microphoneStreamIds = <String>{};
bool _initialized = false;
/// Initialize the background streaming handler with callbacks.
///
/// This should be called once during app startup to register error and
/// event callbacks.
Future<void> initialize({
void Function(String error, String errorType, List<String> streamIds)?
serviceFailedCallback,
void Function(int remainingMinutes)? timeLimitApproachingCallback,
void Function()? microphonePermissionFallbackCallback,
void Function(List<String> streamIds)? streamsSuspendingCallback,
void Function()? backgroundTaskExpiringCallback,
void Function(List<String> streamIds, int estimatedSeconds)?
backgroundTaskExtendedCallback,
void Function()? backgroundKeepAliveCallback,
}) async {
if (_initialized) {
DebugLogger.stream('already-initialized', scope: 'background');
return;
}
_initialized = true;
// Register callbacks
onServiceFailed = serviceFailedCallback;
onBackgroundTimeLimitApproaching = timeLimitApproachingCallback;
onMicrophonePermissionFallback = microphonePermissionFallbackCallback;
onStreamsSuspending = streamsSuspendingCallback;
onBackgroundTaskExpiring = backgroundTaskExpiringCallback;
onBackgroundTaskExtended = backgroundTaskExtendedCallback;
onBackgroundKeepAlive = backgroundKeepAliveCallback;
DebugLogger.stream('initialized', scope: 'background');
}
/// Returns count of actual content streams (excludes socket keepalive).
int get _userVisibleStreamCount =>
@@ -69,9 +138,6 @@ class BackgroundStreamingHandler {
data: {'count': streamIds.length, 'reason': reason},
);
onStreamsSuspending?.call(streamIds);
// Save stream states for recovery
await saveStreamStatesForRecovery(streamIds, reason);
break;
case 'backgroundTaskExpiring':
@@ -120,7 +186,6 @@ class BackgroundStreamingHandler {
// Clean up failed streams
for (final streamId in streamIds) {
_activeStreamIds.remove(streamId);
_streamStates.remove(streamId);
}
break;
@@ -139,10 +204,7 @@ class BackgroundStreamingHandler {
break;
case 'microphonePermissionFallback':
DebugLogger.stream(
'mic-permission-fallback',
scope: 'background',
);
DebugLogger.stream('mic-permission-fallback', scope: 'background');
onMicrophonePermissionFallback?.call();
break;
@@ -157,18 +219,24 @@ class BackgroundStreamingHandler {
}) async {
if (!Platform.isIOS && !Platform.isAndroid) return;
_activeStreamIds.addAll(streamIds);
try {
await _channel.invokeMethod('startBackgroundExecution', {
'streamIds': streamIds,
'requiresMicrophone': requiresMicrophone,
});
// Only add to active streams after successful platform call
_activeStreamIds.addAll(streamIds);
// Track which streams require microphone for reconciliation
if (requiresMicrophone) {
_microphoneStreamIds.addAll(streamIds);
}
DebugLogger.stream(
'start',
scope: 'background',
data: {'count': streamIds.length},
data: {'count': streamIds.length, 'mic': requiresMicrophone},
);
} catch (e) {
DebugLogger.error(
@@ -177,6 +245,8 @@ class BackgroundStreamingHandler {
error: e,
data: {'count': streamIds.length},
);
// Re-throw so callers know the background execution failed
rethrow;
}
}
@@ -184,20 +254,27 @@ class BackgroundStreamingHandler {
Future<void> stopBackgroundExecution(List<String> streamIds) async {
if (!Platform.isIOS && !Platform.isAndroid) return;
_activeStreamIds.removeAll(streamIds);
streamIds.forEach(_streamStates.remove);
try {
await _channel.invokeMethod('stopBackgroundExecution', {
'streamIds': streamIds,
});
// Only remove from tracking after successful platform call
// to maintain state consistency between Flutter and native layers
_activeStreamIds.removeAll(streamIds);
_microphoneStreamIds.removeAll(streamIds);
DebugLogger.stream(
'stop',
scope: 'background',
data: {'count': streamIds.length},
);
} catch (e) {
// Still remove from local tracking on error - the platform may have
// already stopped, and keeping stale state causes issues
_activeStreamIds.removeAll(streamIds);
_microphoneStreamIds.removeAll(streamIds);
DebugLogger.error(
'stop-failed',
scope: 'background',
@@ -207,68 +284,18 @@ class BackgroundStreamingHandler {
}
}
/// Register a stream with its current state
void registerStream(
String streamId, {
required String conversationId,
required String messageId,
String? sessionId,
int? lastChunkSequence,
String? lastContent,
}) {
_streamStates[streamId] = StreamState(
streamId: streamId,
conversationId: conversationId,
messageId: messageId,
sessionId: sessionId,
lastChunkSequence: lastChunkSequence ?? 0,
lastContent: lastContent ?? '',
timestamp: DateTime.now(),
);
_activeStreamIds.add(streamId);
}
/// Update stream state with new chunk
void updateStreamState(
String streamId, {
int? chunkSequence,
String? content,
String? appendedContent,
}) {
final state = _streamStates[streamId];
if (state == null) return;
_streamStates[streamId] = state.copyWith(
lastChunkSequence: chunkSequence ?? state.lastChunkSequence,
lastContent: appendedContent != null
? (state.lastContent + appendedContent)
: (content ?? state.lastContent),
timestamp: DateTime.now(),
);
}
/// Unregister a stream when it completes
void unregisterStream(String streamId) {
_activeStreamIds.remove(streamId);
_streamStates.remove(streamId);
}
/// Get current stream state for recovery
StreamState? getStreamState(String streamId) {
return _streamStates[streamId];
}
/// Keep alive the background task
///
/// On iOS: Refreshes background task to prevent early termination
/// On Android: Refreshes wake lock to keep service running
Future<void> keepAlive() async {
if (!Platform.isIOS && !Platform.isAndroid) return;
///
/// Returns true if keep-alive succeeded, false otherwise.
Future<bool> keepAlive() async {
if (!Platform.isIOS && !Platform.isAndroid) return true;
// Skip keep-alive if no active streams - this ensures Android's count
// stays synchronized with Flutter's actual state
if (_activeStreamIds.isEmpty) return;
if (_activeStreamIds.isEmpty) return true;
try {
await _channel.invokeMethod('keepAlive', {
@@ -277,8 +304,32 @@ class BackgroundStreamingHandler {
'streamCount': _userVisibleStreamCount,
});
DebugLogger.stream('keepalive-success', scope: 'background');
return true;
} catch (e) {
DebugLogger.error('keepalive-failed', scope: 'background', error: e);
return false;
}
}
/// Check if background app refresh is enabled (iOS only).
///
/// Returns true on Android or if iOS background refresh is available.
/// Returns false if iOS background refresh is disabled by user.
Future<bool> checkBackgroundRefreshStatus() async {
if (!Platform.isIOS) return true;
try {
final bool? status = await _channel.invokeMethod<bool>(
'checkBackgroundRefreshStatus',
);
return status ?? true;
} catch (e) {
DebugLogger.error(
'check-background-refresh-failed',
scope: 'background',
error: e,
);
return true; // Assume available on error to not block functionality
}
}
@@ -304,179 +355,97 @@ class BackgroundStreamingHandler {
}
}
/// Recover stream states from previous app session
Future<List<StreamState>> recoverStreamStates() async {
if (!Platform.isIOS && !Platform.isAndroid) return [];
try {
final List<dynamic>? states = await _channel.invokeMethod(
'recoverStreamStates',
);
if (states == null) return [];
final recovered = <StreamState>[];
for (final stateData in states) {
// Platform channels return Map<Object?, Object?>, need to convert
final map = Map<String, dynamic>.from(stateData as Map);
final state = StreamState.fromMap(map);
if (state != null) {
recovered.add(state);
_streamStates[state.streamId] = state;
}
}
DebugLogger.stream(
'recovered',
scope: 'background',
data: {'count': recovered.length},
);
return recovered;
} catch (e) {
DebugLogger.error('recover-failed', scope: 'background', error: e);
return [];
}
}
/// Save stream states for recovery after app restart
Future<void> saveStreamStatesForRecovery(
List<String> streamIds,
String reason,
) async {
DebugLogger.stream(
'saveStreamStatesForRecovery called',
scope: 'background',
data: {
'streamIds': streamIds,
'reason': reason,
'statesCount': _streamStates.length,
},
);
final statesToSave = streamIds
.map((id) => _streamStates[id])
.where((state) => state != null)
.map((state) => state!.toMap())
.toList();
DebugLogger.stream(
'statesToSave prepared',
scope: 'background',
data: {'count': statesToSave.length},
);
try {
await _channel.invokeMethod('saveStreamStates', {
'states': statesToSave,
'reason': reason,
});
DebugLogger.stream(
'save-states-success',
scope: 'background',
data: {'count': statesToSave.length, 'reason': reason},
);
} catch (e) {
DebugLogger.error(
'save-states-failed',
scope: 'background',
error: e,
data: {'count': streamIds.length, 'reason': reason},
);
}
}
/// Check if any streams are currently active
bool get hasActiveStreams => _activeStreamIds.isNotEmpty;
/// Get list of active stream IDs
List<String> get activeStreamIds => _activeStreamIds.toList();
/// Clear all stream data (usually on app termination)
void clearAll() {
_activeStreamIds.clear();
_streamStates.clear();
}
}
/// Notify the native layer that an external component (e.g., speech_to_text
/// plugin) is managing the audio session.
///
/// On iOS, this prevents VoiceBackgroundAudioManager from conflicting with
/// the speech_to_text plugin's audio session management.
/// On Android, this is a no-op as audio session management is different.
Future<void> setExternalAudioSessionOwner(bool isExternal) async {
if (!Platform.isIOS) return;
/// Represents the state of a streaming request
class StreamState {
final String streamId;
final String conversationId;
final String messageId;
final String? sessionId;
final int lastChunkSequence;
final String lastContent;
final DateTime timestamp;
const StreamState({
required this.streamId,
required this.conversationId,
required this.messageId,
this.sessionId,
required this.lastChunkSequence,
required this.lastContent,
required this.timestamp,
});
StreamState copyWith({
String? streamId,
String? conversationId,
String? messageId,
String? sessionId,
int? lastChunkSequence,
String? lastContent,
DateTime? timestamp,
}) {
return StreamState(
streamId: streamId ?? this.streamId,
conversationId: conversationId ?? this.conversationId,
messageId: messageId ?? this.messageId,
sessionId: sessionId ?? this.sessionId,
lastChunkSequence: lastChunkSequence ?? this.lastChunkSequence,
lastContent: lastContent ?? this.lastContent,
timestamp: timestamp ?? this.timestamp,
);
}
Map<String, dynamic> toMap() {
return {
'streamId': streamId,
'conversationId': conversationId,
'messageId': messageId,
'sessionId': sessionId,
'lastChunkSequence': lastChunkSequence,
'lastContent': lastContent,
'timestamp': timestamp.millisecondsSinceEpoch,
};
}
static StreamState? fromMap(Map<String, dynamic> map) {
try {
return StreamState(
streamId: map['streamId'] as String,
conversationId: map['conversationId'] as String,
messageId: map['messageId'] as String,
sessionId: map['sessionId'] as String?,
lastChunkSequence: map['lastChunkSequence'] as int? ?? 0,
lastContent: map['lastContent'] as String? ?? '',
timestamp: DateTime.fromMillisecondsSinceEpoch(
map['timestamp'] as int? ?? DateTime.now().millisecondsSinceEpoch,
),
await _channel.invokeMethod('setExternalAudioSessionOwner', {
'isExternal': isExternal,
});
DebugLogger.stream(
isExternal
? 'external-audio-owner-set'
: 'external-audio-owner-cleared',
scope: 'background',
);
} catch (e) {
DebugLogger.error('parse-failed', scope: 'background', error: e);
return null;
DebugLogger.error(
'set-external-audio-owner-failed',
scope: 'background',
error: e,
);
}
}
/// Check if this state is stale (older than threshold)
bool isStale({Duration threshold = const Duration(minutes: 5)}) {
return DateTime.now().difference(timestamp) > threshold;
/// Clear all stream data (usually on app termination)
void clearAll() {
_activeStreamIds.clear();
_microphoneStreamIds.clear();
}
@override
String toString() {
return 'StreamState(streamId: $streamId, conversationId: $conversationId, '
'messageId: $messageId, sequence: $lastChunkSequence, '
'contentLength: ${lastContent.length}, timestamp: $timestamp)';
/// Reconcile Flutter state with native platform state.
///
/// This should be called on app resume to detect and fix state drift
/// caused by native service crashes or other edge cases. Returns true
/// if reconciliation was needed and performed.
Future<bool> reconcileState() async {
if (!Platform.isIOS && !Platform.isAndroid) return false;
try {
final int? nativeCount = await _channel.invokeMethod<int>(
'getActiveStreamCount',
);
if (nativeCount == null) return false;
// If native has streams but Flutter doesn't, the native service is orphaned
if (nativeCount > 0 && _activeStreamIds.isEmpty) {
DebugLogger.warning(
'reconcile-orphaned-service',
scope: 'background',
data: {'nativeCount': nativeCount},
);
// Stop the orphaned native service
await _channel.invokeMethod('stopAllBackgroundExecution');
return true;
}
// If Flutter has streams but native doesn't, restart the service
if (_activeStreamIds.isNotEmpty && nativeCount == 0) {
// Preserve microphone requirement from tracked streams
final requiresMicrophone = _microphoneStreamIds.isNotEmpty;
DebugLogger.warning(
'reconcile-restart-service',
scope: 'background',
data: {
'flutterCount': _activeStreamIds.length,
'requiresMic': requiresMicrophone,
},
);
// Restart background execution for active streams with preserved capabilities
await _channel.invokeMethod('startBackgroundExecution', {
'streamIds': _activeStreamIds.toList(),
'requiresMicrophone': requiresMicrophone,
});
return true;
}
return false;
} catch (e) {
DebugLogger.error('reconcile-failed', scope: 'background', error: e);
return false;
}
}
}