feat: implement service failure handling in background streaming
- Added a method to send failure notifications to Flutter when the background service fails to enter the foreground. - Implemented a broadcast receiver to handle service failure notifications and notify Flutter about the failure. - Enhanced the persistent streaming service to attempt recovery for failed streams. - Introduced heartbeat monitoring for SSE streams to detect stale connections and trigger recovery actions.
This commit is contained in:
@@ -102,10 +102,20 @@ class BackgroundStreamingService : Service() {
|
||||
} catch (e: Exception) {
|
||||
// Catch all exceptions including ForegroundServiceStartNotAllowedException
|
||||
println("BackgroundStreamingService: Failed to enter foreground: ${e.javaClass.simpleName}: ${e.message}")
|
||||
// Notify Flutter about the failure
|
||||
sendFailureNotification(e)
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendFailureNotification(e: Exception) {
|
||||
// Send broadcast intent to notify MainActivity
|
||||
val intent = Intent("app.cogwheel.conduit.FOREGROUND_SERVICE_FAILED")
|
||||
intent.putExtra("error", e.message ?: "Unknown error")
|
||||
intent.putExtra("errorType", e.javaClass.simpleName)
|
||||
sendBroadcast(intent)
|
||||
}
|
||||
|
||||
private fun updateForegroundType(notification: Notification, type: Int) {
|
||||
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.Q) return
|
||||
try {
|
||||
@@ -248,6 +258,7 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal
|
||||
private val streamsRequiringMic = mutableSetOf<String>()
|
||||
private var backgroundJob: Job? = null
|
||||
private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
|
||||
private var serviceFailureReceiver: android.content.BroadcastReceiver? = null
|
||||
|
||||
companion object {
|
||||
private const val CHANNEL_NAME = "conduit/background_streaming"
|
||||
@@ -262,6 +273,38 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal
|
||||
sharedPrefs = context.getSharedPreferences(PREFS_NAME, Context.MODE_PRIVATE)
|
||||
|
||||
createNotificationChannel()
|
||||
setupServiceFailureReceiver()
|
||||
}
|
||||
|
||||
private fun setupServiceFailureReceiver() {
|
||||
serviceFailureReceiver = object : android.content.BroadcastReceiver() {
|
||||
override fun onReceive(context: Context?, intent: Intent?) {
|
||||
if (intent?.action == "app.cogwheel.conduit.FOREGROUND_SERVICE_FAILED") {
|
||||
val error = intent.getStringExtra("error") ?: "Unknown error"
|
||||
val errorType = intent.getStringExtra("errorType") ?: "Exception"
|
||||
|
||||
println("BackgroundStreamingHandler: Service failure received: $errorType - $error")
|
||||
|
||||
// Notify Flutter about the service failure
|
||||
channel.invokeMethod("serviceFailed", mapOf(
|
||||
"error" to error,
|
||||
"errorType" to errorType,
|
||||
"streamIds" to activeStreams.toList()
|
||||
))
|
||||
|
||||
// Clear active streams since service failed
|
||||
activeStreams.clear()
|
||||
streamsRequiringMic.clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val filter = android.content.IntentFilter("app.cogwheel.conduit.FOREGROUND_SERVICE_FAILED")
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
|
||||
context.registerReceiver(serviceFailureReceiver, filter, Context.RECEIVER_NOT_EXPORTED)
|
||||
} else {
|
||||
context.registerReceiver(serviceFailureReceiver, filter)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onMethodCall(call: MethodCall, result: Result) {
|
||||
@@ -514,5 +557,15 @@ class BackgroundStreamingHandler(private val activity: MainActivity) : MethodCal
|
||||
scope.cancel()
|
||||
stopBackgroundMonitoring()
|
||||
stopForegroundService()
|
||||
|
||||
// Unregister broadcast receiver
|
||||
try {
|
||||
serviceFailureReceiver?.let {
|
||||
context.unregisterReceiver(it)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
println("BackgroundStreamingHandler: Error unregistering receiver: ${e.message}")
|
||||
}
|
||||
serviceFailureReceiver = null
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3067,6 +3067,29 @@ class ApiService {
|
||||
final Map<String, CancelToken> _streamCancelTokens = {};
|
||||
final Map<String, String> _messagePersistentStreamIds = {};
|
||||
|
||||
/// Associates a streaming message with its persistent stream identifier.
|
||||
void registerPersistentStreamForMessage(String messageId, String streamId) {
|
||||
_messagePersistentStreamIds[messageId] = streamId;
|
||||
}
|
||||
|
||||
/// Removes the persistent stream mapping for a message if it matches.
|
||||
///
|
||||
/// Returns the removed persistent stream identifier when one existed and
|
||||
/// matched the optional [expectedStreamId].
|
||||
String? clearPersistentStreamForMessage(
|
||||
String messageId, {
|
||||
String? expectedStreamId,
|
||||
}) {
|
||||
final current = _messagePersistentStreamIds[messageId];
|
||||
if (current == null) {
|
||||
return null;
|
||||
}
|
||||
if (expectedStreamId != null && current != expectedStreamId) {
|
||||
return null;
|
||||
}
|
||||
return _messagePersistentStreamIds.remove(messageId);
|
||||
}
|
||||
|
||||
// Send message using dual-stream approach (HTTP SSE + WebSocket events).
|
||||
// Matches OpenWebUI web client behavior:
|
||||
// - HTTP SSE stream provides immediate content chunks
|
||||
@@ -3211,9 +3234,13 @@ class ApiService {
|
||||
(data['params'] as Map<String, dynamic>?) ?? <String, dynamic>{};
|
||||
params['function_calling'] = functionCallingMode;
|
||||
data['params'] = params;
|
||||
_traceApi('Set params.function_calling = $functionCallingMode (from user settings)');
|
||||
_traceApi(
|
||||
'Set params.function_calling = $functionCallingMode (from user settings)',
|
||||
);
|
||||
} else {
|
||||
_traceApi('No function_calling preference in user settings, backend will use default mode');
|
||||
_traceApi(
|
||||
'No function_calling preference in user settings, backend will use default mode',
|
||||
);
|
||||
}
|
||||
} catch (_) {
|
||||
// Non-fatal; continue without setting function_calling mode
|
||||
@@ -3288,8 +3315,17 @@ class ApiService {
|
||||
data: data,
|
||||
options: Options(
|
||||
responseType: ResponseType.stream,
|
||||
// Extended timeout for streaming responses - allow up to 10 minutes
|
||||
// for long-running tool calls and reasoning
|
||||
receiveTimeout: const Duration(minutes: 10),
|
||||
// Shorter send timeout for the initial request
|
||||
sendTimeout: const Duration(seconds: 30),
|
||||
headers: {
|
||||
'Accept': 'text/event-stream',
|
||||
// Enable HTTP keep-alive to maintain connection in background
|
||||
'Connection': 'keep-alive',
|
||||
// Request server to send keep-alive messages
|
||||
'Cache-Control': 'no-cache',
|
||||
},
|
||||
),
|
||||
cancelToken: cancelToken,
|
||||
@@ -3318,6 +3354,17 @@ class ApiService {
|
||||
await for (final chunk in SSEStreamParser.parseResponseStream(
|
||||
respData,
|
||||
splitLargeDeltas: false,
|
||||
heartbeatTimeout: const Duration(minutes: 2),
|
||||
onHeartbeat: () {
|
||||
// Notify persistent streaming service that connection is alive
|
||||
final persistentStreamId = _messagePersistentStreamIds[messageId];
|
||||
if (persistentStreamId != null) {
|
||||
PersistentStreamingService().updateStreamProgress(
|
||||
persistentStreamId,
|
||||
chunkSequence: DateTime.now().millisecondsSinceEpoch,
|
||||
);
|
||||
}
|
||||
},
|
||||
)) {
|
||||
if (!streamController.isClosed) {
|
||||
streamController.add(chunk);
|
||||
@@ -3399,7 +3446,7 @@ class ApiService {
|
||||
} catch (_) {}
|
||||
|
||||
try {
|
||||
final pid = _messagePersistentStreamIds.remove(messageId);
|
||||
final pid = clearPersistentStreamForMessage(messageId);
|
||||
if (pid != null) {
|
||||
PersistentStreamingService().unregisterStream(pid);
|
||||
}
|
||||
|
||||
@@ -30,6 +30,8 @@ class BackgroundStreamingHandler {
|
||||
onBackgroundTaskExtended;
|
||||
void Function()? onBackgroundKeepAlive;
|
||||
bool Function()? shouldContinueInBackground;
|
||||
void Function(String error, String errorType, List<String> streamIds)?
|
||||
onServiceFailed;
|
||||
|
||||
void _setupMethodCallHandler() {
|
||||
_channel.setMethodCallHandler((call) async {
|
||||
@@ -79,6 +81,31 @@ class BackgroundStreamingHandler {
|
||||
DebugLogger.stream('keepalive-signal', scope: 'background');
|
||||
onBackgroundKeepAlive?.call();
|
||||
break;
|
||||
|
||||
case 'serviceFailed':
|
||||
final Map<String, dynamic> args =
|
||||
call.arguments as Map<String, dynamic>;
|
||||
final String error = args['error'] as String? ?? 'Unknown error';
|
||||
final String errorType = args['errorType'] as String? ?? 'Exception';
|
||||
final List<String> streamIds =
|
||||
(args['streamIds'] as List?)?.cast<String>() ?? [];
|
||||
|
||||
DebugLogger.error(
|
||||
'service-failed',
|
||||
scope: 'background',
|
||||
error: error,
|
||||
data: {'type': errorType, 'streams': streamIds.length},
|
||||
);
|
||||
|
||||
// Notify callback about service failure
|
||||
onServiceFailed?.call(error, errorType, streamIds);
|
||||
|
||||
// Clean up failed streams
|
||||
for (final streamId in streamIds) {
|
||||
_activeStreamIds.remove(streamId);
|
||||
_streamStates.remove(streamId);
|
||||
}
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -46,6 +46,28 @@ class PersistentStreamingService with WidgetsBindingObserver {
|
||||
}
|
||||
|
||||
void _setupBackgroundHandlerCallbacks() {
|
||||
_backgroundHandler.onServiceFailed = (error, errorType, streamIds) {
|
||||
DebugLogger.error(
|
||||
'background-service-failed',
|
||||
scope: 'streaming/persistent',
|
||||
error: '$errorType: $error',
|
||||
data: {'affectedStreams': streamIds},
|
||||
);
|
||||
|
||||
// Attempt immediate recovery for failed streams
|
||||
for (final streamId in streamIds) {
|
||||
final callback = _streamRecoveryCallbacks[streamId];
|
||||
if (callback != null) {
|
||||
// Schedule recovery after a short delay
|
||||
Future.delayed(const Duration(seconds: 2), () {
|
||||
if (_activeStreams.containsKey(streamId)) {
|
||||
_attemptStreamRecovery(streamId, callback);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
_backgroundHandler.onStreamsSuspending = (streamIds) {
|
||||
DebugLogger.stream(
|
||||
'PersistentStreaming: Streams suspending - $streamIds',
|
||||
@@ -123,10 +145,47 @@ class PersistentStreamingService with WidgetsBindingObserver {
|
||||
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
|
||||
if (_activeStreams.isNotEmpty && _isInBackground) {
|
||||
_backgroundHandler.keepAlive();
|
||||
|
||||
// Check for stale streams during background operation
|
||||
_checkStreamHealth();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _checkStreamHealth() {
|
||||
final now = DateTime.now();
|
||||
final staleStreams = <String>[];
|
||||
|
||||
for (final entry in _streamMetadata.entries) {
|
||||
final streamId = entry.key;
|
||||
final metadata = entry.value;
|
||||
final lastUpdate = metadata['lastUpdate'] as DateTime?;
|
||||
|
||||
if (lastUpdate != null) {
|
||||
final timeSinceUpdate = now.difference(lastUpdate);
|
||||
|
||||
// If no update in 90 seconds while in background, consider stale
|
||||
if (timeSinceUpdate > const Duration(seconds: 90)) {
|
||||
DebugLogger.warning(
|
||||
'Stream $streamId appears stale: ${timeSinceUpdate.inSeconds}s since last update',
|
||||
);
|
||||
staleStreams.add(streamId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt recovery for stale streams
|
||||
for (final streamId in staleStreams) {
|
||||
final callback = _streamRecoveryCallbacks[streamId];
|
||||
if (callback != null && _retryAttempts[streamId] == null) {
|
||||
DebugLogger.stream(
|
||||
'Initiating recovery for stale stream: $streamId',
|
||||
);
|
||||
_attemptStreamRecovery(streamId, callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void didChangeAppLifecycleState(AppLifecycleState state) {
|
||||
// _lastLifecycleState = state; // Removed as it's unused
|
||||
@@ -385,13 +444,26 @@ class PersistentStreamingService with WidgetsBindingObserver {
|
||||
final metadata = _streamMetadata[streamId];
|
||||
if (metadata == null) return false;
|
||||
|
||||
// Check if stream is marked as suspended
|
||||
if (metadata['suspended'] == true) {
|
||||
final suspendedAt = metadata['suspendedAt'] as DateTime?;
|
||||
if (suspendedAt != null) {
|
||||
final timeSinceSuspend = DateTime.now().difference(suspendedAt);
|
||||
// Try to recover suspended streams after 10 seconds
|
||||
return timeSinceSuspend > const Duration(seconds: 10);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if stream has been inactive for too long
|
||||
final lastUpdate = metadata['lastUpdate'] as DateTime?;
|
||||
if (lastUpdate != null) {
|
||||
final timeSinceUpdate = DateTime.now().difference(lastUpdate);
|
||||
// Align with app-side watchdogs: be less aggressive than UI guard
|
||||
// but still attempt recovery before server timeouts become likely.
|
||||
return timeSinceUpdate > const Duration(minutes: 2);
|
||||
// In background: 90 seconds
|
||||
// In foreground: 2 minutes
|
||||
final threshold = _isInBackground
|
||||
? const Duration(seconds: 90)
|
||||
: const Duration(minutes: 2);
|
||||
return timeSinceUpdate > threshold;
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
@@ -12,15 +12,45 @@ class SSEStreamParser {
|
||||
///
|
||||
/// Returns a stream of content strings extracted from OpenAI-style
|
||||
/// completion chunks.
|
||||
///
|
||||
/// [heartbeatTimeout] - Maximum time without data before considering
|
||||
/// the connection stale (default: 2 minutes)
|
||||
/// [onHeartbeat] - Callback invoked when any data is received
|
||||
static Stream<String> parseResponseStream(
|
||||
ResponseBody responseBody, {
|
||||
bool splitLargeDeltas = false,
|
||||
Duration heartbeatTimeout = const Duration(minutes: 2),
|
||||
void Function()? onHeartbeat,
|
||||
}) async* {
|
||||
DateTime lastDataReceived = DateTime.now();
|
||||
Timer? heartbeatTimer;
|
||||
|
||||
// Set up heartbeat monitoring
|
||||
if (heartbeatTimeout.inMilliseconds > 0) {
|
||||
heartbeatTimer = Timer.periodic(
|
||||
const Duration(seconds: 30),
|
||||
(timer) {
|
||||
final timeSinceLastData = DateTime.now().difference(lastDataReceived);
|
||||
if (timeSinceLastData > heartbeatTimeout) {
|
||||
DebugLogger.warning(
|
||||
'SSE stream heartbeat timeout: No data received for ${timeSinceLastData.inSeconds}s',
|
||||
data: {'timeout': heartbeatTimeout.inSeconds},
|
||||
);
|
||||
timer.cancel();
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
// Buffer for accumulating incomplete SSE messages
|
||||
String buffer = '';
|
||||
|
||||
await for (final chunk in responseBody.stream) {
|
||||
// Update last data timestamp and invoke heartbeat callback
|
||||
lastDataReceived = DateTime.now();
|
||||
onHeartbeat?.call();
|
||||
|
||||
// Convert bytes to string (Dio ResponseBody.stream always emits Uint8List)
|
||||
final text = utf8.decode(chunk as List<int>, allowMalformed: true);
|
||||
buffer += text;
|
||||
@@ -68,6 +98,9 @@ class SSEStreamParser {
|
||||
stackTrace: stackTrace,
|
||||
);
|
||||
rethrow;
|
||||
} finally {
|
||||
// Clean up heartbeat timer
|
||||
heartbeatTimer?.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import '../../shared/theme/theme_extensions.dart';
|
||||
import '../utils/debug_logger.dart';
|
||||
import '../utils/openwebui_source_parser.dart';
|
||||
import 'streaming_response_controller.dart';
|
||||
import 'api_service.dart';
|
||||
|
||||
// Keep local verbosity toggle for socket logs
|
||||
const bool kSocketVerboseLogging = false;
|
||||
@@ -67,7 +68,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
required Map<String, dynamic> modelItem,
|
||||
required String sessionId,
|
||||
required String? activeConversationId,
|
||||
required dynamic api,
|
||||
required ApiService api,
|
||||
required SocketService? socketService,
|
||||
RegisterConversationDeltaListener? registerDeltaListener,
|
||||
// Message update callbacks
|
||||
@@ -169,6 +170,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
'modelId': modelId,
|
||||
},
|
||||
);
|
||||
api.registerPersistentStreamForMessage(assistantMessageId, streamId);
|
||||
|
||||
InactivityWatchdog? socketWatchdog;
|
||||
final socketSubscriptions = <VoidCallback>[];
|
||||
@@ -318,8 +320,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
if (chatId == null || chatId.isEmpty) {
|
||||
return;
|
||||
}
|
||||
if (api == null) return;
|
||||
|
||||
refreshingSnapshot = true;
|
||||
try {
|
||||
final conversation = await api.getConversation(chatId);
|
||||
@@ -376,7 +376,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
try {
|
||||
// Fire and forget
|
||||
// ignore: unawaited_futures
|
||||
api?.sendChatCompleted(
|
||||
api.sendChatCompleted(
|
||||
chatId: activeConversationId ?? '',
|
||||
messageId: assistantMessageId,
|
||||
messages: const [],
|
||||
@@ -397,7 +397,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
} catch (_) {}
|
||||
try {
|
||||
// ignore: unawaited_futures
|
||||
api?.sendChatCompleted(
|
||||
api.sendChatCompleted(
|
||||
chatId: activeConversationId ?? '',
|
||||
messageId: assistantMessageId,
|
||||
messages: const [],
|
||||
@@ -594,7 +594,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
if (payload['done'] == true) {
|
||||
try {
|
||||
// ignore: unawaited_futures
|
||||
api?.sendChatCompleted(
|
||||
api.sendChatCompleted(
|
||||
chatId: activeConversationId ?? '',
|
||||
messageId: assistantMessageId,
|
||||
messages: const [],
|
||||
@@ -614,8 +614,8 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
try {
|
||||
final chatId = activeConversationId;
|
||||
if (chatId != null && chatId.isNotEmpty) {
|
||||
final resp = await api?.dio.get('/api/v1/chats/$chatId');
|
||||
final data = resp?.data as Map<String, dynamic>?;
|
||||
final resp = await api.dio.get('/api/v1/chats/$chatId');
|
||||
final data = resp.data as Map<String, dynamic>?;
|
||||
String content = '';
|
||||
final chatObj = data?['chat'] as Map<String, dynamic>?;
|
||||
if (chatObj != null) {
|
||||
@@ -1137,6 +1137,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
}
|
||||
},
|
||||
onComplete: () {
|
||||
api.clearPersistentStreamForMessage(
|
||||
assistantMessageId,
|
||||
expectedStreamId: streamId,
|
||||
);
|
||||
// Unregister from persistent service
|
||||
persistentService.unregisterStream(streamId);
|
||||
|
||||
@@ -1159,6 +1163,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
},
|
||||
);
|
||||
|
||||
api.clearPersistentStreamForMessage(
|
||||
assistantMessageId,
|
||||
expectedStreamId: streamId,
|
||||
);
|
||||
try {
|
||||
persistentService.unregisterStream(streamId);
|
||||
} catch (_) {}
|
||||
|
||||
@@ -1447,7 +1447,7 @@ Future<void> regenerateMessage(
|
||||
modelItem: modelItem,
|
||||
sessionId: effectiveSessionId,
|
||||
activeConversationId: activeConversation.id,
|
||||
api: api,
|
||||
api: api!,
|
||||
socketService: socketService,
|
||||
registerDeltaListener: registerDeltaListener,
|
||||
appendToLastMessage: (c) =>
|
||||
@@ -1719,7 +1719,7 @@ Future<void> _sendMessageInternal(
|
||||
final List<String> ids = msg.attachmentIds ?? const <String>[];
|
||||
if (ids.isNotEmpty) {
|
||||
final messageMap = await _buildMessagePayloadWithAttachments(
|
||||
api: api,
|
||||
api: api!,
|
||||
role: msg.role,
|
||||
cleanedText: cleaned,
|
||||
attachmentIds: ids,
|
||||
@@ -1995,7 +1995,7 @@ Future<void> _sendMessageInternal(
|
||||
modelItem: modelItem,
|
||||
sessionId: effectiveSessionId,
|
||||
activeConversationId: activeConversation?.id,
|
||||
api: api,
|
||||
api: api!,
|
||||
socketService: socketService,
|
||||
registerDeltaListener: registerDeltaListener,
|
||||
appendToLastMessage: (c) =>
|
||||
|
||||
Reference in New Issue
Block a user