feat: background streaming of responses

This commit is contained in:
cogwheel0
2025-08-16 20:27:44 +05:30
parent 33fc26d755
commit 9be04ef2b9
23 changed files with 2676 additions and 322 deletions

View File

@@ -1,4 +1,5 @@
import 'package:flutter/material.dart';
import 'package:flutter/foundation.dart' as foundation;
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:flutter_secure_storage/flutter_secure_storage.dart';
import 'package:shared_preferences/shared_preferences.dart';
@@ -139,7 +140,7 @@ final apiServiceProvider = Provider<ApiService?>((ref) {
// Keep legacy callback for backward compatibility during transition
apiService.onAuthTokenInvalid = () {
// This will be removed once migration is complete
debugPrint('DEBUG: Legacy auth invalidation callback triggered');
foundation.debugPrint('DEBUG: Legacy auth invalidation callback triggered');
};
// Initialize with any existing token immediately
@@ -176,7 +177,7 @@ final apiTokenUpdaterProvider = Provider<void>((ref) {
final api = ref.read(apiServiceProvider);
if (api != null && next != null && next.isNotEmpty) {
api.updateAuthToken(next);
debugPrint('DEBUG: Updated API service with unified auth token');
foundation.debugPrint('DEBUG: Updated API service with unified auth token');
}
});
});
@@ -229,17 +230,17 @@ final modelsProvider = FutureProvider<List<Model>>((ref) async {
if (api == null) return [];
try {
debugPrint('DEBUG: Fetching models from server');
foundation.debugPrint('DEBUG: Fetching models from server');
final models = await api.getModels();
debugPrint('DEBUG: Successfully fetched ${models.length} models');
foundation.debugPrint('DEBUG: Successfully fetched ${models.length} models');
return models;
} catch (e) {
debugPrint('ERROR: Failed to fetch models: $e');
foundation.debugPrint('ERROR: Failed to fetch models: $e');
// If models endpoint returns 403, this should now clear auth token
// and redirect user to login since it's marked as a core endpoint
if (e.toString().contains('403')) {
debugPrint(
foundation.debugPrint(
'DEBUG: Models endpoint returned 403 - authentication may be invalid',
);
}
@@ -267,25 +268,25 @@ final conversationsProvider = FutureProvider<List<Conversation>>((ref) async {
}
final api = ref.watch(apiServiceProvider);
if (api == null) {
debugPrint('DEBUG: No API service available');
foundation.debugPrint('DEBUG: No API service available');
return [];
}
try {
debugPrint('DEBUG: Fetching conversations from OpenWebUI API...');
foundation.debugPrint('DEBUG: Fetching conversations from OpenWebUI API...');
final conversations = await api.getConversations(limit: 50);
debugPrint(
foundation.debugPrint(
'DEBUG: Successfully fetched ${conversations.length} conversations',
);
return conversations;
} catch (e, stackTrace) {
debugPrint('DEBUG: Error fetching conversations: $e');
debugPrint('DEBUG: Stack trace: $stackTrace');
foundation.debugPrint('DEBUG: Error fetching conversations: $e');
foundation.debugPrint('DEBUG: Stack trace: $stackTrace');
// If conversations endpoint returns 403, this should now clear auth token
// and redirect user to login since it's marked as a core endpoint
if (e.toString().contains('403')) {
debugPrint(
foundation.debugPrint(
'DEBUG: Conversations endpoint returned 403 - authentication may be invalid',
);
}
@@ -307,9 +308,9 @@ final loadConversationProvider = FutureProvider.family<Conversation, String>((
throw Exception('No API service available');
}
debugPrint('DEBUG: Loading full conversation: $conversationId');
foundation.debugPrint('DEBUG: Loading full conversation: $conversationId');
final fullConversation = await api.getConversation(conversationId);
debugPrint(
foundation.debugPrint(
'DEBUG: Loaded conversation with ${fullConversation.messages.length} messages',
);
@@ -325,14 +326,14 @@ final defaultModelProvider = FutureProvider<Model?>((ref) async {
// Get all available models first
final models = await ref.read(modelsProvider.future);
if (models.isEmpty) {
debugPrint('DEBUG: No models available');
foundation.debugPrint('DEBUG: No models available');
return null;
}
// Check if a model is already selected
final currentSelected = ref.read(selectedModelProvider);
if (currentSelected != null) {
debugPrint('DEBUG: Model already selected: ${currentSelected.name}');
foundation.debugPrint('DEBUG: Model already selected: ${currentSelected.name}');
return currentSelected;
}
@@ -352,11 +353,11 @@ final defaultModelProvider = FutureProvider<Model?>((ref) async {
model.id.contains(defaultModelId) ||
model.name.contains(defaultModelId),
);
debugPrint(
foundation.debugPrint(
'DEBUG: Found server default model: ${selectedModel.name}',
);
} catch (e) {
debugPrint(
foundation.debugPrint(
'DEBUG: Default model "$defaultModelId" not found in available models',
);
selectedModel = models.first;
@@ -364,26 +365,26 @@ final defaultModelProvider = FutureProvider<Model?>((ref) async {
} else {
// No server default, use first available model
selectedModel = models.first;
debugPrint(
foundation.debugPrint(
'DEBUG: No server default model, using first available: ${selectedModel.name}',
);
}
} catch (apiError) {
debugPrint('DEBUG: Failed to get default model from server: $apiError');
foundation.debugPrint('DEBUG: Failed to get default model from server: $apiError');
// Use first available model as fallback
selectedModel = models.first;
debugPrint(
foundation.debugPrint(
'DEBUG: Using first available model as fallback: ${selectedModel.name}',
);
}
// Set the selected model
ref.read(selectedModelProvider.notifier).state = selectedModel;
debugPrint('DEBUG: Set default model: ${selectedModel.name}');
foundation.debugPrint('DEBUG: Set default model: ${selectedModel.name}');
return selectedModel;
} catch (e) {
debugPrint('DEBUG: Error setting default model: $e');
foundation.debugPrint('DEBUG: Error setting default model: $e');
// Final fallback: try to select any available model
try {
@@ -391,13 +392,13 @@ final defaultModelProvider = FutureProvider<Model?>((ref) async {
if (models.isNotEmpty) {
final fallbackModel = models.first;
ref.read(selectedModelProvider.notifier).state = fallbackModel;
debugPrint(
foundation.debugPrint(
'DEBUG: Fallback to first available model: ${fallbackModel.name}',
);
return fallbackModel;
}
} catch (fallbackError) {
debugPrint('DEBUG: Error in fallback model selection: $fallbackError');
foundation.debugPrint('DEBUG: Error in fallback model selection: $fallbackError');
}
return null;
@@ -415,15 +416,15 @@ final backgroundModelLoadProvider = Provider<void>((ref) {
// Wait a bit to ensure auth is complete
await Future.delayed(const Duration(milliseconds: 1500));
debugPrint('DEBUG: Starting background model loading');
foundation.debugPrint('DEBUG: Starting background model loading');
// Load default model in background
try {
await ref.read(defaultModelProvider.future);
debugPrint('DEBUG: Background model loading completed');
foundation.debugPrint('DEBUG: Background model loading completed');
} catch (e) {
// Ignore errors in background loading
debugPrint('DEBUG: Background model loading failed: $e');
foundation.debugPrint('DEBUG: Background model loading failed: $e');
}
});
@@ -448,7 +449,7 @@ final serverSearchProvider = FutureProvider.family<List<Conversation>, String>((
if (api == null) return [];
try {
debugPrint('DEBUG: Performing server-side search for: "$query"');
foundation.debugPrint('DEBUG: Performing server-side search for: "$query"');
// Use the new server-side search API
final searchResult = await api.searchChats(
@@ -467,10 +468,10 @@ final serverSearchProvider = FutureProvider.family<List<Conversation>, String>((
return Conversation.fromJson(data as Map<String, dynamic>);
}).toList();
debugPrint('DEBUG: Server search returned ${conversations.length} results');
foundation.debugPrint('DEBUG: Server search returned ${conversations.length} results');
return conversations;
} catch (e) {
debugPrint('DEBUG: Server search failed, fallback to local: $e');
foundation.debugPrint('DEBUG: Server search failed, fallback to local: $e');
// Fallback to local search if server search fails
final allConversations = await ref.read(conversationsProvider.future);
@@ -609,7 +610,7 @@ final userSettingsProvider = FutureProvider<UserSettings>((ref) async {
final settingsData = await api.getUserSettings();
return UserSettings.fromJson(settingsData);
} catch (e) {
debugPrint('DEBUG: Error fetching user settings: $e');
foundation.debugPrint('DEBUG: Error fetching user settings: $e');
// Return default settings on error
return const UserSettings();
}
@@ -625,7 +626,7 @@ final serverBannersProvider = FutureProvider<List<Map<String, dynamic>>>((
try {
return await api.getBanners();
} catch (e) {
debugPrint('DEBUG: Error fetching banners: $e');
foundation.debugPrint('DEBUG: Error fetching banners: $e');
return [];
}
});
@@ -640,7 +641,7 @@ final conversationSuggestionsProvider = FutureProvider<List<String>>((
try {
return await api.getSuggestions();
} catch (e) {
debugPrint('DEBUG: Error fetching suggestions: $e');
foundation.debugPrint('DEBUG: Error fetching suggestions: $e');
return [];
}
});
@@ -656,7 +657,7 @@ final foldersProvider = FutureProvider<List<Folder>>((ref) async {
.map((folderData) => Folder.fromJson(folderData))
.toList();
} catch (e) {
debugPrint('DEBUG: Error fetching folders: $e');
foundation.debugPrint('DEBUG: Error fetching folders: $e');
return [];
}
});
@@ -670,7 +671,7 @@ final userFilesProvider = FutureProvider<List<FileInfo>>((ref) async {
final filesData = await api.getUserFiles();
return filesData.map((fileData) => FileInfo.fromJson(fileData)).toList();
} catch (e) {
debugPrint('DEBUG: Error fetching files: $e');
foundation.debugPrint('DEBUG: Error fetching files: $e');
return [];
}
});
@@ -686,7 +687,7 @@ final fileContentProvider = FutureProvider.family<String, String>((
try {
return await api.getFileContent(fileId);
} catch (e) {
debugPrint('DEBUG: Error fetching file content: $e');
foundation.debugPrint('DEBUG: Error fetching file content: $e');
throw Exception('Failed to load file content: $e');
}
});
@@ -700,7 +701,7 @@ final knowledgeBasesProvider = FutureProvider<List<KnowledgeBase>>((ref) async {
final kbData = await api.getKnowledgeBases();
return kbData.map((data) => KnowledgeBase.fromJson(data)).toList();
} catch (e) {
debugPrint('DEBUG: Error fetching knowledge bases: $e');
foundation.debugPrint('DEBUG: Error fetching knowledge bases: $e');
return [];
}
});
@@ -716,7 +717,7 @@ final knowledgeBaseItemsProvider =
.map((data) => KnowledgeBaseItem.fromJson(data))
.toList();
} catch (e) {
debugPrint('DEBUG: Error fetching knowledge base items: $e');
foundation.debugPrint('DEBUG: Error fetching knowledge base items: $e');
return [];
}
});
@@ -729,7 +730,7 @@ final availableVoicesProvider = FutureProvider<List<String>>((ref) async {
try {
return await api.getAvailableVoices();
} catch (e) {
debugPrint('DEBUG: Error fetching voices: $e');
foundation.debugPrint('DEBUG: Error fetching voices: $e');
return [];
}
});
@@ -744,7 +745,7 @@ final imageModelsProvider = FutureProvider<List<Map<String, dynamic>>>((
try {
return await api.getImageModels();
} catch (e) {
debugPrint('DEBUG: Error fetching image models: $e');
foundation.debugPrint('DEBUG: Error fetching image models: $e');
return [];
}
});

View File

@@ -1,7 +1,6 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:flutter/foundation.dart';
import 'package:dio/dio.dart';
import 'package:http_parser/http_parser.dart';
@@ -17,6 +16,8 @@ import '../auth/api_auth_interceptor.dart';
import '../validation/validation_interceptor.dart';
import '../error/api_error_interceptor.dart';
import 'sse_parser.dart';
import 'stream_recovery_service.dart';
import 'persistent_streaming_service.dart';
class ApiService {
final Dio _dio;
@@ -713,7 +714,7 @@ class ApiService {
};
debugPrint('DEBUG: Sending chat data with proper parent-child structure');
debugPrint('DEBUG: Request data: ${chatData}');
debugPrint('DEBUG: Request data: $chatData');
final response = await _dio.post('/api/v1/chats/new', data: chatData);
@@ -2411,27 +2412,65 @@ class ApiService {
);
}
// SSE streaming with proper EventSource parser - Main Implementation
// SSE streaming with persistent background support - Main Implementation
void _streamSSE(
Map<String, dynamic> data,
StreamController<String> streamController,
String messageId,
) async {
try {
debugPrint('DEBUG: Making SSE request with parser to /api/chat/completions');
// Create a fresh Dio instance without interceptors for SSE streaming
// This avoids any interference from validation or other interceptors
final streamDio = Dio(BaseOptions(
final persistentService = PersistentStreamingService();
final recoveryService = StreamRecoveryService();
final streamId = DateTime.now().millisecondsSinceEpoch.toString();
// Extract metadata for recovery
final conversationId = data['conversation_id'] ?? data['chat_id'] ?? '';
final sessionId = data['session_id'] ?? const Uuid().v4().substring(0, 20);
// Register stream for recovery
recoveryService.registerStream(
streamId,
StreamRecoveryState(
baseUrl: serverConfig.url,
connectTimeout: const Duration(seconds: 30),
receiveTimeout: null, // No timeout for streaming
endpoint: '/api/chat/completions',
originalRequest: data,
headers: {
'Authorization': 'Bearer ${_authInterceptor.authToken}',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
),
);
// Recovery callback for persistent service
Future<void> recoveryCallback() async {
debugPrint('Persistent: Attempting to recover stream $streamId');
// Restart the streaming request
_streamSSE(data, streamController, messageId);
};
// Declare variables that need to be accessible in catch block
String? persistentStreamId;
try {
debugPrint('DEBUG: Making SSE request with parser to /api/chat/completions');
// Create a fresh Dio instance optimized for SSE streaming
final streamDio = Dio(BaseOptions(
baseUrl: serverConfig.url,
connectTimeout: const Duration(seconds: 60), // Longer for initial connection
receiveTimeout: null, // No timeout for streaming
sendTimeout: const Duration(seconds: 30),
headers: {
'Authorization': 'Bearer ${_authInterceptor.authToken}',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
...serverConfig.customHeaders, // Include any custom headers
},
validateStatus: (status) => status != null && status < 400,
followRedirects: true,
maxRedirects: 3,
));
debugPrint('DEBUG: Sending SSE request with data: ${jsonEncode(data)}');
@@ -2529,132 +2568,283 @@ class ApiService {
return;
}
// Parse SSE stream using our parser
// Parse SSE stream using enhanced parser with heartbeat monitoring
final rawStream = response.data.stream;
// Handle the stream properly based on its actual type
Stream<List<int>> byteStream;
if (rawStream is Stream<Uint8List>) {
// Convert Uint8List to List<int>
byteStream = rawStream.map((uint8list) => uint8list.toList());
} else {
byteStream = rawStream as Stream<List<int>>;
}
// Convert byte stream to string stream
final stringStream = byteStream.transform(utf8.decoder);
// Parse SSE events with enhanced parser (includes heartbeat monitoring)
final sseParser = SSEParser(heartbeatTimeout: const Duration(seconds: 45));
int contentIndex = 0;
int chunkSequence = 0;
String accumulatedContent = '';
// Parse SSE events from the string stream
final sseParser = SSEParser();
stringStream.listen(
(chunk) {
sseParser.feed(chunk);
// Monitor parser heartbeat for reconnection
sseParser.heartbeat.listen((_) {
debugPrint('Persistent: SSE heartbeat timeout detected');
});
sseParser.reconnectRequests.listen((lastEventId) {
debugPrint('Persistent: SSE reconnection requested, lastEventId: $lastEventId');
// The persistent service will handle the reconnection
});
// Convert bytes to SSE events
final sseEventStream = SSEParser.parseStream(
byteStream,
heartbeatTimeout: const Duration(seconds: 45),
);
// Listen to the SSE event stream
final streamSubscription = sseEventStream.listen(
(event) {
try {
chunkSequence++;
// Update parser with chunk data for heartbeat monitoring
sseParser.feed(''); // Reset heartbeat timer
// Process the event data
if (persistentStreamId != null) {
_processSseEvent(
event,
streamController,
chunkSequence,
accumulatedContent,
persistentService,
persistentStreamId,
);
}
// Update recovery state
recoveryService.updateStreamProgress(streamId, event.data, contentIndex++);
} catch (e) {
debugPrint('Persistent: Error processing SSE event: $e');
streamController.addError(e);
}
},
onDone: () {
sseParser.close();
debugPrint('Persistent: SSE stream completed normally');
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
if (!streamController.isClosed) {
streamController.close();
}
},
onError: (error) {
debugPrint('DEBUG: SSE stream decode error: $error');
streamController.addError(error);
onError: (error) async {
debugPrint('Persistent: SSE stream error: $error');
// Try recovery through recovery service first
final recoveredStream = await recoveryService.recoverStream(streamId);
if (recoveredStream != null) {
debugPrint('Persistent: Successfully recovered SSE stream');
recoveredStream.listen(
(data) => streamController.add(data),
onDone: () {
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
streamController.close();
},
onError: (e) {
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
streamController.addError(e);
},
);
} else {
// Let persistent service handle recovery
debugPrint('Persistent: Delegating recovery to persistent service');
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
streamController.addError(error);
}
},
cancelOnError: false, // Continue processing despite individual event errors
);
// Register with persistent streaming service now that subscription is created
persistentStreamId = persistentService.registerStream(
subscription: streamSubscription,
controller: streamController,
recoveryCallback: recoveryCallback,
metadata: {
'conversationId': conversationId,
'messageId': messageId,
'sessionId': sessionId,
'lastChunkSequence': 0,
'lastContent': '',
'endpoint': '/api/chat/completions',
'requestData': data,
},
);
final sseEvents = sseParser.stream;
debugPrint('DEBUG: Starting to process SSE events');
await for (final event in sseEvents) {
debugPrint('DEBUG: SSE event - type: ${event.event}, data: ${event.data}');
if (event.data == '[DONE]') {
debugPrint('DEBUG: SSE stream finished with [DONE]');
streamController.close();
return;
}
try {
final json = jsonDecode(event.data) as Map<String, dynamic>;
// Handle errors
if (json.containsKey('error')) {
final error = json['error'];
debugPrint('DEBUG: SSE error: $error');
streamController.addError('Server error: $error');
return;
}
// Handle content streaming
if (json.containsKey('choices')) {
final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('delta')) {
final delta = choice['delta'] as Map<String, dynamic>;
// Extract content
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('DEBUG: SSE content chunk: "$content"');
streamController.add(content);
}
}
// Handle tool calls
if (delta.containsKey('tool_calls')) {
final toolCalls = delta['tool_calls'] as List?;
if (toolCalls != null && toolCalls.isNotEmpty) {
debugPrint('DEBUG: SSE tool calls: $toolCalls');
}
}
}
// Handle finish reason
if (choice.containsKey('finish_reason')) {
final finishReason = choice['finish_reason'];
if (finishReason != null) {
debugPrint('DEBUG: SSE finished with reason: $finishReason');
streamController.close();
return;
}
}
}
}
// Handle other event types
if (json.containsKey('sources')) {
debugPrint('DEBUG: SSE sources: ${json['sources']}');
}
if (json.containsKey('usage')) {
debugPrint('DEBUG: SSE usage: ${json['usage']}');
}
} catch (e) {
debugPrint('DEBUG: Error parsing SSE event data: $e');
// Continue processing
}
}
debugPrint('DEBUG: SSE stream ended');
streamController.close();
} catch (e) {
debugPrint('DEBUG: SSE streaming error: $e');
if (e is DioException) {
debugPrint('DEBUG: DioException details:');
debugPrint(' - Type: ${e.type}');
debugPrint(' - Message: ${e.message}');
debugPrint(' - Response: ${e.response}');
if (e.response != null) {
debugPrint(' - Status code: ${e.response!.statusCode}');
debugPrint(' - Headers: ${e.response!.headers}');
}
debugPrint('Persistent: Failed to create SSE stream: $e');
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
if (e is DioException && e.response?.statusCode == 401) {
// Auth error - don't retry
streamController.addError('Authentication failed');
} else {
// Network or other error - trigger recovery
await recoveryCallback();
}
streamController.addError(e);
}
}
/// Process individual SSE events with content extraction and progress tracking
void _processSseEvent(
SSEEvent event,
StreamController<String> streamController,
int chunkSequence,
String accumulatedContent,
PersistentStreamingService persistentService,
String persistentStreamId,
) {
debugPrint('Persistent: SSE event - type: ${event.event}, data: ${event.data}');
// Handle completion signal
if (event.data == '[DONE]') {
debugPrint('Persistent: SSE stream finished with [DONE]');
if (!streamController.isClosed) {
streamController.close();
}
return;
}
try {
final json = jsonDecode(event.data) as Map<String, dynamic>;
// Handle errors
if (json.containsKey('error')) {
final error = json['error'];
debugPrint('Persistent: SSE error: $error');
streamController.addError('Server error: $error');
return;
}
// Handle content streaming
if (json.containsKey('choices')) {
final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('delta')) {
final delta = choice['delta'] as Map<String, dynamic>;
// Extract content
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: SSE content chunk: "$content"');
// Add content to stream
if (!streamController.isClosed) {
streamController.add(content);
}
// Update persistent service progress
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
appendedContent: content,
);
accumulatedContent += content;
}
}
// Check for completion in delta
if (delta.containsKey('finish_reason')) {
final finishReason = delta['finish_reason'];
debugPrint('Persistent: Stream finished with reason: $finishReason');
if (!streamController.isClosed) {
streamController.close();
}
return;
}
} else if (choice.containsKey('finish_reason')) {
// Check for completion at choice level
final finishReason = choice['finish_reason'];
if (finishReason != null) {
debugPrint('Persistent: Stream finished with reason: $finishReason');
if (!streamController.isClosed) {
streamController.close();
}
return;
}
}
}
}
// Handle streaming chat/completions format variations
if (json.containsKey('delta')) {
final delta = json['delta'] as Map<String, dynamic>;
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Direct delta content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
appendedContent: content,
);
accumulatedContent += content;
}
}
}
// Handle OpenRouter-style streaming
if (json.containsKey('message')) {
final message = json['message'] as Map<String, dynamic>;
if (message.containsKey('content')) {
final content = message['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Message content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content, // Full content, not appended
);
}
}
}
} catch (e) {
debugPrint('Persistent: Error parsing SSE event data: $e');
// Don't fail the entire stream for one bad event
}
}
// Enhanced SSE parser that matches OpenWebUI's EventSourceParserStream approach
void _streamChatCompletionEnhanced(

View File

@@ -0,0 +1,289 @@
import 'dart:async';
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart';
/// Handles background streaming continuation for iOS and Android
///
/// On iOS: Uses background tasks to keep streams alive for ~30 seconds
/// On Android: Uses foreground service notifications
class BackgroundStreamingHandler {
static const MethodChannel _channel = MethodChannel('conduit/background_streaming');
static BackgroundStreamingHandler? _instance;
static BackgroundStreamingHandler get instance => _instance ??= BackgroundStreamingHandler._();
BackgroundStreamingHandler._() {
_setupMethodCallHandler();
}
final Set<String> _activeStreamIds = <String>{};
final Map<String, StreamState> _streamStates = <String, StreamState>{};
// Callbacks for platform-specific events
void Function(List<String> streamIds)? onStreamsSuspending;
void Function()? onBackgroundTaskExpiring;
bool Function()? shouldContinueInBackground;
void _setupMethodCallHandler() {
_channel.setMethodCallHandler((call) async {
switch (call.method) {
case 'checkStreams':
return _activeStreamIds.length;
case 'streamsSuspending':
final Map<String, dynamic> args = call.arguments as Map<String, dynamic>;
final List<String> streamIds = (args['streamIds'] as List).cast<String>();
final String reason = args['reason'] as String;
debugPrint('Background: Streams suspending - $streamIds (reason: $reason)');
onStreamsSuspending?.call(streamIds);
// Save stream states for recovery
await _saveStreamStatesForRecovery(streamIds, reason);
break;
case 'backgroundTaskExpiring':
debugPrint('Background: Background task expiring');
onBackgroundTaskExpiring?.call();
break;
}
});
}
/// Start background execution for given stream IDs
Future<void> startBackgroundExecution(List<String> streamIds) async {
if (!Platform.isIOS && !Platform.isAndroid) return;
_activeStreamIds.addAll(streamIds);
try {
await _channel.invokeMethod('startBackgroundExecution', {
'streamIds': streamIds,
});
debugPrint('Background: Started background execution for ${streamIds.length} streams');
} catch (e) {
debugPrint('Background: Failed to start background execution: $e');
}
}
/// Stop background execution for given stream IDs
Future<void> stopBackgroundExecution(List<String> streamIds) async {
if (!Platform.isIOS && !Platform.isAndroid) return;
_activeStreamIds.removeAll(streamIds);
streamIds.forEach(_streamStates.remove);
try {
await _channel.invokeMethod('stopBackgroundExecution', {
'streamIds': streamIds,
});
debugPrint('Background: Stopped background execution for ${streamIds.length} streams');
} catch (e) {
debugPrint('Background: Failed to stop background execution: $e');
}
}
/// Register a stream with its current state
void registerStream(String streamId, {
required String conversationId,
required String messageId,
String? sessionId,
int? lastChunkSequence,
String? lastContent,
}) {
_streamStates[streamId] = StreamState(
streamId: streamId,
conversationId: conversationId,
messageId: messageId,
sessionId: sessionId,
lastChunkSequence: lastChunkSequence ?? 0,
lastContent: lastContent ?? '',
timestamp: DateTime.now(),
);
_activeStreamIds.add(streamId);
}
/// Update stream state with new chunk
void updateStreamState(String streamId, {
int? chunkSequence,
String? content,
String? appendedContent,
}) {
final state = _streamStates[streamId];
if (state == null) return;
_streamStates[streamId] = state.copyWith(
lastChunkSequence: chunkSequence ?? state.lastChunkSequence,
lastContent: appendedContent != null
? (state.lastContent + appendedContent)
: (content ?? state.lastContent),
timestamp: DateTime.now(),
);
}
/// Unregister a stream when it completes
void unregisterStream(String streamId) {
_activeStreamIds.remove(streamId);
_streamStates.remove(streamId);
}
/// Get current stream state for recovery
StreamState? getStreamState(String streamId) {
return _streamStates[streamId];
}
/// Keep alive the background task (iOS only)
Future<void> keepAlive() async {
if (!Platform.isIOS) return;
try {
await _channel.invokeMethod('keepAlive');
} catch (e) {
debugPrint('Background: Failed to keep alive: $e');
}
}
/// Recover stream states from previous app session
Future<List<StreamState>> recoverStreamStates() async {
if (!Platform.isIOS && !Platform.isAndroid) return [];
try {
final List<dynamic>? states = await _channel.invokeMethod('recoverStreamStates');
if (states == null) return [];
final recovered = <StreamState>[];
for (final stateData in states) {
final map = stateData as Map<String, dynamic>;
final state = StreamState.fromMap(map);
if (state != null) {
recovered.add(state);
_streamStates[state.streamId] = state;
}
}
debugPrint('Background: Recovered ${recovered.length} stream states');
return recovered;
} catch (e) {
debugPrint('Background: Failed to recover stream states: $e');
return [];
}
}
/// Save stream states for recovery after app restart
Future<void> _saveStreamStatesForRecovery(List<String> streamIds, String reason) async {
final statesToSave = streamIds
.map((id) => _streamStates[id])
.where((state) => state != null)
.map((state) => state!.toMap())
.toList();
try {
await _channel.invokeMethod('saveStreamStates', {
'states': statesToSave,
'reason': reason,
});
} catch (e) {
debugPrint('Background: Failed to save stream states: $e');
}
}
/// Check if any streams are currently active
bool get hasActiveStreams => _activeStreamIds.isNotEmpty;
/// Get list of active stream IDs
List<String> get activeStreamIds => _activeStreamIds.toList();
/// Clear all stream data (usually on app termination)
void clearAll() {
_activeStreamIds.clear();
_streamStates.clear();
}
}
/// Represents the state of a streaming request
class StreamState {
final String streamId;
final String conversationId;
final String messageId;
final String? sessionId;
final int lastChunkSequence;
final String lastContent;
final DateTime timestamp;
const StreamState({
required this.streamId,
required this.conversationId,
required this.messageId,
this.sessionId,
required this.lastChunkSequence,
required this.lastContent,
required this.timestamp,
});
StreamState copyWith({
String? streamId,
String? conversationId,
String? messageId,
String? sessionId,
int? lastChunkSequence,
String? lastContent,
DateTime? timestamp,
}) {
return StreamState(
streamId: streamId ?? this.streamId,
conversationId: conversationId ?? this.conversationId,
messageId: messageId ?? this.messageId,
sessionId: sessionId ?? this.sessionId,
lastChunkSequence: lastChunkSequence ?? this.lastChunkSequence,
lastContent: lastContent ?? this.lastContent,
timestamp: timestamp ?? this.timestamp,
);
}
Map<String, dynamic> toMap() {
return {
'streamId': streamId,
'conversationId': conversationId,
'messageId': messageId,
'sessionId': sessionId,
'lastChunkSequence': lastChunkSequence,
'lastContent': lastContent,
'timestamp': timestamp.millisecondsSinceEpoch,
};
}
static StreamState? fromMap(Map<String, dynamic> map) {
try {
return StreamState(
streamId: map['streamId'] as String,
conversationId: map['conversationId'] as String,
messageId: map['messageId'] as String,
sessionId: map['sessionId'] as String?,
lastChunkSequence: map['lastChunkSequence'] as int? ?? 0,
lastContent: map['lastContent'] as String? ?? '',
timestamp: DateTime.fromMillisecondsSinceEpoch(
map['timestamp'] as int? ?? DateTime.now().millisecondsSinceEpoch,
),
);
} catch (e) {
debugPrint('Failed to parse StreamState from map: $e');
return null;
}
}
/// Check if this state is stale (older than threshold)
bool isStale({Duration threshold = const Duration(minutes: 5)}) {
return DateTime.now().difference(timestamp) > threshold;
}
@override
String toString() {
return 'StreamState(streamId: $streamId, conversationId: $conversationId, '
'messageId: $messageId, sequence: $lastChunkSequence, '
'contentLength: ${lastContent.length}, timestamp: $timestamp)';
}
}

View File

@@ -20,6 +20,13 @@ class ConnectivityService {
Stream<ConnectivityStatus> get connectivityStream =>
_connectivityController.stream;
ConnectivityStatus get currentStatus => _lastStatus;
/// Stream that emits true when connected, false when offline
Stream<bool> get isConnected => connectivityStream
.map((status) => status == ConnectivityStatus.online);
/// Check if currently connected
bool get isCurrentlyConnected => _lastStatus == ConnectivityStatus.online;
void _startConnectivityMonitoring() {
// Initial check after a brief delay to avoid showing offline during startup

View File

@@ -0,0 +1,440 @@
import 'dart:async';
import 'package:flutter/material.dart';
import 'package:wakelock_plus/wakelock_plus.dart';
import 'package:dio/dio.dart';
import 'background_streaming_handler.dart';
import 'connectivity_service.dart';
class PersistentStreamingService with WidgetsBindingObserver {
static final PersistentStreamingService _instance = PersistentStreamingService._internal();
factory PersistentStreamingService() => _instance;
PersistentStreamingService._internal() {
_initialize();
}
// Active streams registry
final Map<String, StreamSubscription> _activeStreams = {};
final Map<String, StreamController> _streamControllers = {};
final Map<String, Function> _streamRecoveryCallbacks = {};
final Map<String, Map<String, dynamic>> _streamMetadata = {};
// App lifecycle state
// AppLifecycleState? _lastLifecycleState; // Removed as it's unused
bool _isInBackground = false;
Timer? _backgroundTimer;
Timer? _heartbeatTimer;
// Background streaming handler
late final BackgroundStreamingHandler _backgroundHandler;
// Connectivity monitoring
StreamSubscription<bool>? _connectivitySubscription;
bool _hasConnectivity = true;
// Recovery state
final Map<String, int> _retryAttempts = {};
static const int _maxRetryAttempts = 3;
static const Duration _retryDelay = Duration(seconds: 2);
void _initialize() {
WidgetsBinding.instance.addObserver(this);
_backgroundHandler = BackgroundStreamingHandler.instance;
_setupBackgroundHandlerCallbacks();
_setupConnectivityMonitoring();
_startHeartbeat();
}
void _setupBackgroundHandlerCallbacks() {
_backgroundHandler.onStreamsSuspending = (streamIds) {
debugPrint('PersistentStreaming: Streams suspending - $streamIds');
// Mark streams as suspended but don't close them yet
for (final streamId in streamIds) {
_markStreamAsSuspended(streamId);
}
};
_backgroundHandler.onBackgroundTaskExpiring = () {
debugPrint('PersistentStreaming: Background task expiring');
// Save states and prepare for recovery
_saveStreamStatesForRecovery();
};
_backgroundHandler.shouldContinueInBackground = () {
return _activeStreams.isNotEmpty;
};
}
void _setupConnectivityMonitoring() {
// Create a connectivity service instance - this would normally be injected
// For now, create a temporary instance just for monitoring
final connectivityService = ConnectivityService(Dio());
_connectivitySubscription = connectivityService.isConnected.listen((connected) {
final wasConnected = _hasConnectivity;
_hasConnectivity = connected;
if (!wasConnected && connected) {
// Connectivity restored - try to recover streams
debugPrint('PersistentStreaming: Connectivity restored, recovering streams');
_recoverActiveStreams();
} else if (wasConnected && !connected) {
// Connectivity lost - mark streams as suspended
debugPrint('PersistentStreaming: Connectivity lost, suspending streams');
_suspendAllStreams();
}
});
}
void _startHeartbeat() {
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
if (_activeStreams.isNotEmpty && _isInBackground) {
_backgroundHandler.keepAlive();
}
});
}
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
// _lastLifecycleState = state; // Removed as it's unused
switch (state) {
case AppLifecycleState.paused:
case AppLifecycleState.inactive:
_onAppBackground();
break;
case AppLifecycleState.resumed:
_onAppForeground();
break;
case AppLifecycleState.detached:
case AppLifecycleState.hidden:
// Handle app termination
_onAppDetached();
break;
}
}
void _onAppBackground() {
debugPrint('PersistentStreamingService: App went to background');
_isInBackground = true;
// Enable wake lock to prevent device sleep during streaming
if (_activeStreams.isNotEmpty) {
_enableWakeLock();
_startBackgroundExecution();
}
}
void _onAppForeground() {
debugPrint('PersistentStreamingService: App returned to foreground');
_isInBackground = false;
// Cancel background timer
_backgroundTimer?.cancel();
_backgroundTimer = null;
// Disable wake lock if no active streams
if (_activeStreams.isEmpty) {
_disableWakeLock();
}
// Check and recover any interrupted streams
_recoverActiveStreams();
}
void _onAppDetached() {
debugPrint('PersistentStreamingService: App detached');
// Save stream states for recovery
_saveStreamStatesForRecovery();
// Clean up
_backgroundTimer?.cancel();
_heartbeatTimer?.cancel();
_disableWakeLock();
}
// Register a stream for persistent handling
String registerStream({
required StreamSubscription subscription,
required StreamController controller,
Function? recoveryCallback,
Map<String, dynamic>? metadata,
}) {
final streamId = DateTime.now().millisecondsSinceEpoch.toString();
_activeStreams[streamId] = subscription;
_streamControllers[streamId] = controller;
if (recoveryCallback != null) {
_streamRecoveryCallbacks[streamId] = recoveryCallback;
}
// Store metadata for recovery
if (metadata != null) {
_streamMetadata[streamId] = metadata;
// Register with background handler
_backgroundHandler.registerStream(
streamId,
conversationId: metadata['conversationId'] ?? '',
messageId: metadata['messageId'] ?? '',
sessionId: metadata['sessionId'],
lastChunkSequence: metadata['lastChunkSequence'],
lastContent: metadata['lastContent'],
);
}
// Enable wake lock when streaming starts
if (_activeStreams.length == 1) {
_enableWakeLock();
}
// Start background execution if app is backgrounded
if (_isInBackground) {
_startBackgroundExecution();
}
debugPrint('PersistentStreamingService: Registered stream $streamId');
return streamId;
}
// Unregister a stream
void unregisterStream(String streamId) {
_activeStreams.remove(streamId);
_streamControllers.remove(streamId);
_streamRecoveryCallbacks.remove(streamId);
_streamMetadata.remove(streamId);
_retryAttempts.remove(streamId);
// Unregister from background handler
_backgroundHandler.unregisterStream(streamId);
// Stop background execution if no more streams
if (_activeStreams.isEmpty) {
_backgroundHandler.stopBackgroundExecution([streamId]);
_disableWakeLock();
}
debugPrint('PersistentStreamingService: Unregistered stream $streamId');
}
// Check if a stream is still active
bool isStreamActive(String streamId) {
return _activeStreams.containsKey(streamId);
}
// Recover interrupted streams
Future<void> _recoverActiveStreams() async {
if (!_hasConnectivity) {
debugPrint('PersistentStreaming: No connectivity, skipping recovery');
return;
}
// First, try to recover from background handler saved states
final savedStates = await _backgroundHandler.recoverStreamStates();
for (final state in savedStates) {
if (!state.isStale()) {
await _recoverStreamFromState(state);
}
}
// Then check active streams for recovery
for (final entry in _streamRecoveryCallbacks.entries) {
final streamId = entry.key;
final recoveryCallback = entry.value;
// Check if stream was interrupted or needs recovery
final subscription = _activeStreams[streamId];
if (subscription == null || _needsRecovery(streamId)) {
await _attemptStreamRecovery(streamId, recoveryCallback);
}
}
}
Future<void> _recoverStreamFromState(StreamState state) async {
final recoveryCallback = _streamRecoveryCallbacks[state.streamId];
if (recoveryCallback != null) {
debugPrint('PersistentStreaming: Recovering stream from saved state: ${state.streamId}');
await _attemptStreamRecovery(state.streamId, recoveryCallback);
}
}
Future<void> _attemptStreamRecovery(String streamId, Function recoveryCallback) async {
final attempts = _retryAttempts[streamId] ?? 0;
if (attempts >= _maxRetryAttempts) {
debugPrint('PersistentStreaming: Max retry attempts reached for stream $streamId');
return;
}
debugPrint('PersistentStreaming: Recovering stream $streamId (attempt ${attempts + 1})');
try {
_retryAttempts[streamId] = attempts + 1;
// Add exponential backoff delay
if (attempts > 0) {
final delay = _retryDelay * (1 << (attempts - 1)); // 2s, 4s, 8s...
await Future.delayed(delay);
}
// Call recovery callback to restart the stream
await recoveryCallback();
// Reset retry count on success
_retryAttempts.remove(streamId);
} catch (e) {
debugPrint('PersistentStreaming: Failed to recover stream $streamId: $e');
// Schedule next retry if under limit
if (_retryAttempts[streamId]! < _maxRetryAttempts) {
Timer(_retryDelay, () => _attemptStreamRecovery(streamId, recoveryCallback));
}
}
}
bool _needsRecovery(String streamId) {
final metadata = _streamMetadata[streamId];
if (metadata == null) return false;
// Check if stream has been inactive for too long
final lastUpdate = metadata['lastUpdate'] as DateTime?;
if (lastUpdate != null) {
final timeSinceUpdate = DateTime.now().difference(lastUpdate);
return timeSinceUpdate > const Duration(minutes: 1);
}
return false;
}
// Platform-specific background execution
void _startBackgroundExecution() {
if (_activeStreams.isNotEmpty) {
_backgroundHandler.startBackgroundExecution(_activeStreams.keys.toList());
}
}
void _markStreamAsSuspended(String streamId) {
final metadata = _streamMetadata[streamId];
if (metadata != null) {
metadata['suspended'] = true;
metadata['suspendedAt'] = DateTime.now();
}
}
void _suspendAllStreams() {
for (final streamId in _activeStreams.keys) {
_markStreamAsSuspended(streamId);
}
}
void _saveStreamStatesForRecovery() {
// The background handler will handle the actual saving
debugPrint('PersistentStreaming: Saving ${_activeStreams.length} stream states for recovery');
}
// Update stream metadata when chunks are received
void updateStreamProgress(String streamId, {
int? chunkSequence,
String? content,
String? appendedContent,
}) {
// Update background handler state
_backgroundHandler.updateStreamState(
streamId,
chunkSequence: chunkSequence,
content: content,
appendedContent: appendedContent,
);
// Update local metadata
final metadata = _streamMetadata[streamId];
if (metadata != null) {
metadata['lastUpdate'] = DateTime.now();
metadata['lastChunkSequence'] = chunkSequence ?? metadata['lastChunkSequence'];
if (appendedContent != null) {
metadata['lastContent'] = (metadata['lastContent'] ?? '') + appendedContent;
} else if (content != null) {
metadata['lastContent'] = content;
}
metadata['suspended'] = false; // Mark as active
}
}
// Wake lock management
void _enableWakeLock() async {
try {
await WakelockPlus.enable();
debugPrint('PersistentStreamingService: Wake lock enabled');
} catch (e) {
debugPrint('PersistentStreamingService: Failed to enable wake lock: $e');
}
}
void _disableWakeLock() async {
try {
await WakelockPlus.disable();
debugPrint('PersistentStreamingService: Wake lock disabled');
} catch (e) {
debugPrint('PersistentStreamingService: Failed to disable wake lock: $e');
}
}
// Get active stream count
int get activeStreamCount => _activeStreams.length;
// Get stream metadata
Map<String, dynamic>? getStreamMetadata(String streamId) {
return _streamMetadata[streamId];
}
// Check if stream is suspended
bool isStreamSuspended(String streamId) {
final metadata = _streamMetadata[streamId];
return metadata?['suspended'] == true;
}
// Force recovery of a specific stream
Future<void> forceRecoverStream(String streamId) async {
final recoveryCallback = _streamRecoveryCallbacks[streamId];
if (recoveryCallback != null) {
_retryAttempts.remove(streamId); // Reset retry count
await _attemptStreamRecovery(streamId, recoveryCallback);
}
}
// Cleanup
void dispose() {
WidgetsBinding.instance.removeObserver(this);
_backgroundTimer?.cancel();
_heartbeatTimer?.cancel();
_connectivitySubscription?.cancel();
_disableWakeLock();
// Stop all background execution
if (_activeStreams.isNotEmpty) {
_backgroundHandler.stopBackgroundExecution(_activeStreams.keys.toList());
}
// Cancel all active streams
for (final subscription in _activeStreams.values) {
subscription.cancel();
}
_activeStreams.clear();
// Close all controllers
for (final controller in _streamControllers.values) {
if (!controller.isClosed) {
controller.close();
}
}
_streamControllers.clear();
// Clear all metadata
_streamMetadata.clear();
_streamRecoveryCallbacks.clear();
_retryAttempts.clear();
// Clear background handler
_backgroundHandler.clearAll();
}
}

View File

@@ -306,7 +306,7 @@ class PlatformService {
return Switch(
value: value,
onChanged: onChanged,
activeColor: activeColor,
activeThumbColor: activeColor,
);
}
}

View File

@@ -1,5 +1,6 @@
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
/// Event data from Server-Sent Events stream
class SSEEvent {
@@ -16,7 +17,7 @@ class SSEEvent {
});
}
/// Parser for Server-Sent Events
/// Parser for Server-Sent Events with robust error handling and heartbeat support
class SSEParser {
final _controller = StreamController<SSEEvent>.broadcast();
@@ -26,35 +27,115 @@ class SSEParser {
String _currentData = '';
int? _currentRetry;
// Heartbeat and health monitoring
Timer? _heartbeatTimer;
DateTime _lastDataReceived = DateTime.now();
Duration _heartbeatTimeout = const Duration(seconds: 30);
bool _isClosed = false;
// Recovery state
String? _lastEventId;
bool _reconnectRequested = false;
Stream<SSEEvent> get stream => _controller.stream;
// Events for monitoring connection health
final _heartbeatController = StreamController<void>.broadcast();
final _reconnectController = StreamController<String?>.broadcast();
Stream<void> get heartbeat => _heartbeatController.stream;
Stream<String?> get reconnectRequests => _reconnectController.stream;
SSEParser({Duration? heartbeatTimeout}) {
if (heartbeatTimeout != null) {
_heartbeatTimeout = heartbeatTimeout;
}
_startHeartbeatTimer();
}
/// Feed raw text data to the parser
void feed(String chunk) {
if (_isClosed) return;
_lastDataReceived = DateTime.now();
_buffer += chunk;
_processBuffer();
// Reset heartbeat timer since we received data
_resetHeartbeatTimer();
}
void _startHeartbeatTimer() {
_heartbeatTimer?.cancel();
_heartbeatTimer = Timer(_heartbeatTimeout, _onHeartbeatTimeout);
}
void _resetHeartbeatTimer() {
if (!_isClosed) {
_startHeartbeatTimer();
}
}
void _onHeartbeatTimeout() {
debugPrint('SSEParser: Heartbeat timeout - no data received in ${_heartbeatTimeout.inSeconds}s');
if (!_isClosed) {
// Emit heartbeat timeout event
_heartbeatController.add(null);
// Request reconnection with last event ID for recovery
_reconnectRequested = true;
_reconnectController.add(_lastEventId);
}
}
/// Process buffered data and emit events
void _processBuffer() {
// Split by newlines but keep the last incomplete line
final lines = _buffer.split('\n');
// Keep the last line in buffer if it doesn't end with newline
if (!_buffer.endsWith('\n')) {
_buffer = lines.removeLast();
} else {
try {
// Handle potential Unicode boundary issues by checking for incomplete characters
if (_buffer.isNotEmpty && _hasIncompleteUnicode(_buffer)) {
// Keep buffer intact if it might contain incomplete Unicode
return;
}
// Split by newlines but keep the last incomplete line
final lines = _buffer.split('\n');
// Keep the last line in buffer if it doesn't end with newline
if (!_buffer.endsWith('\n')) {
_buffer = lines.removeLast();
} else {
_buffer = '';
}
for (final line in lines) {
_processLine(line);
}
} catch (e) {
debugPrint('SSEParser: Error processing buffer: $e');
// Reset buffer on parsing error to prevent cascading failures
_buffer = '';
}
}
bool _hasIncompleteUnicode(String text) {
if (text.isEmpty) return false;
for (final line in lines) {
_processLine(line);
}
// Check if the last few characters might be incomplete Unicode
// This is a simple heuristic - in practice, Dart's UTF-8 decoder handles this
final lastChar = text.codeUnitAt(text.length - 1);
// If it's a high surrogate, we might be missing the low surrogate
return lastChar >= 0xD800 && lastChar <= 0xDBFF;
}
/// Process a single line according to SSE spec
void _processLine(String line) {
// Handle carriage return if present (some servers use \r\n)
final cleanLine = line.replaceAll('\r', '');
// Empty line signals end of event
if (line.trim().isEmpty) {
if (cleanLine.trim().isEmpty) {
if (_currentData.isNotEmpty) {
_emitEvent();
}
@@ -62,27 +143,32 @@ class SSEParser {
return;
}
// Comment line (starts with :)
// OpenRouter sends ": OPENROUTER PROCESSING" messages
if (line.startsWith(':')) {
// Log but ignore comments
if (line.contains('OPENROUTER')) {
// OpenRouter processing indicator - ignore silently
// Comment line (starts with :) - these serve as keep-alives
if (cleanLine.startsWith(':')) {
// Treat comments as heartbeat signals
_lastDataReceived = DateTime.now();
_resetHeartbeatTimer();
// Log processing indicators but don't spam debug output
if (cleanLine.contains('OPENROUTER') && kDebugMode) {
debugPrint('SSEParser: OpenRouter processing...');
} else if (cleanLine.contains('PROCESSING') && kDebugMode) {
debugPrint('SSEParser: Server processing...');
}
return; // Ignore comments
return;
}
// Parse field and value
final colonIndex = line.indexOf(':');
final colonIndex = cleanLine.indexOf(':');
String field;
String value;
if (colonIndex == -1) {
field = line;
field = cleanLine;
value = '';
} else {
field = line.substring(0, colonIndex);
value = line.substring(colonIndex + 1);
field = cleanLine.substring(0, colonIndex);
value = cleanLine.substring(colonIndex + 1);
// Remove leading space from value if present
if (value.startsWith(' ')) {
value = value.substring(1);
@@ -104,6 +190,7 @@ class SSEParser {
case 'id':
_currentId = value;
_lastEventId = value; // Track for reconnection
break;
case 'retry':
@@ -121,12 +208,27 @@ class SSEParser {
/// Emit the current event
void _emitEvent() {
_controller.add(SSEEvent(
id: _currentId,
event: _currentEvent,
data: _currentData,
retry: _currentRetry,
));
if (_isClosed) return;
try {
final event = SSEEvent(
id: _currentId,
event: _currentEvent,
data: _currentData,
retry: _currentRetry,
);
_controller.add(event);
// Track last event ID for potential reconnection
if (_currentId != null) {
_lastEventId = _currentId;
}
} catch (e) {
debugPrint('SSEParser: Error emitting event: $e');
_controller.addError(e);
}
}
/// Reset current event state
@@ -138,42 +240,146 @@ class SSEParser {
/// Close the parser
void close() {
if (_isClosed) return;
_isClosed = true;
// Cancel heartbeat timer
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
// Emit any remaining data
if (_currentData.isNotEmpty) {
_emitEvent();
}
// Close controllers
_controller.close();
_heartbeatController.close();
_reconnectController.close();
}
/// Parse SSE events from a stream of bytes
static Stream<SSEEvent> parseStream(Stream<List<int>> byteStream) {
final parser = SSEParser();
/// Get the last event ID for reconnection
String? get lastEventId => _lastEventId;
/// Check if parser is closed
bool get isClosed => _isClosed;
/// Check if reconnection was requested due to timeout
bool get reconnectRequested => _reconnectRequested;
/// Reset reconnect flag (call when reconnection is handled)
void resetReconnectFlag() {
_reconnectRequested = false;
}
/// Get time since last data was received
Duration get timeSinceLastData => DateTime.now().difference(_lastDataReceived);
/// Parse SSE events from a stream of bytes with robust error handling
static Stream<SSEEvent> parseStream(
Stream<List<int>> byteStream, {
Duration? heartbeatTimeout,
}) {
final parser = SSEParser(heartbeatTimeout: heartbeatTimeout);
// Convert bytes to text and feed to parser
byteStream
// Convert bytes to text and feed to parser with error recovery
StreamSubscription? subscription;
subscription = byteStream
.transform(utf8.decoder)
.listen(
(chunk) => parser.feed(chunk),
(chunk) {
try {
parser.feed(chunk);
} catch (e) {
debugPrint('SSEParser: Error feeding chunk: $e');
// Don't propagate feed errors - just skip the problematic chunk
}
},
onDone: () => parser.close(),
onError: (error) => parser._controller.addError(error),
onError: (error) {
debugPrint('SSEParser: Stream error: $error');
parser._controller.addError(error);
},
cancelOnError: false, // Continue processing despite errors
);
// Clean up subscription when parser is closed
parser._controller.onCancel = () {
subscription?.cancel();
};
return parser.stream;
}
}
/// Transform a text stream into SSE events
/// Transform a text stream into SSE events with heartbeat monitoring
class SSETransformer extends StreamTransformerBase<String, SSEEvent> {
final Duration? heartbeatTimeout;
const SSETransformer({this.heartbeatTimeout});
@override
Stream<SSEEvent> bind(Stream<String> stream) {
final parser = SSEParser();
final parser = SSEParser(heartbeatTimeout: heartbeatTimeout);
stream.listen(
(chunk) => parser.feed(chunk),
StreamSubscription? subscription;
subscription = stream.listen(
(chunk) {
try {
parser.feed(chunk);
} catch (e) {
debugPrint('SSETransformer: Error feeding chunk: $e');
// Continue processing despite errors
}
},
onDone: () => parser.close(),
onError: (error) => parser._controller.addError(error),
onError: (error) {
debugPrint('SSETransformer: Stream error: $error');
parser._controller.addError(error);
},
cancelOnError: false,
);
// Clean up subscription when parser is closed
parser._controller.onCancel = () {
subscription?.cancel();
};
return parser.stream;
}
}
/// Enhanced SSE event with additional metadata for resilient streaming
class EnhancedSSEEvent extends SSEEvent {
final DateTime timestamp;
final int sequenceNumber;
final String? sessionId;
EnhancedSSEEvent({
required super.data,
super.id,
super.event,
super.retry,
required this.timestamp,
required this.sequenceNumber,
this.sessionId,
});
factory EnhancedSSEEvent.fromSSEEvent(
SSEEvent event, {
required int sequenceNumber,
String? sessionId,
}) {
return EnhancedSSEEvent(
data: event.data,
id: event.id,
event: event.event,
retry: event.retry,
timestamp: DateTime.now(),
sequenceNumber: sequenceNumber,
sessionId: sessionId,
);
}
}

View File

@@ -0,0 +1,237 @@
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:dio/dio.dart';
class StreamRecoveryService {
static const int maxRetries = 3;
static const Duration retryDelay = Duration(seconds: 2);
// Recovery state for each stream
final Map<String, StreamRecoveryState> _recoveryStates = {};
// Register a stream for recovery
void registerStream(String streamId, StreamRecoveryState state) {
_recoveryStates[streamId] = state;
debugPrint('StreamRecoveryService: Registered stream $streamId for recovery');
}
// Unregister a stream
void unregisterStream(String streamId) {
_recoveryStates.remove(streamId);
debugPrint('StreamRecoveryService: Unregistered stream $streamId');
}
// Attempt to recover a stream
Future<Stream<String>?> recoverStream(String streamId) async {
final state = _recoveryStates[streamId];
if (state == null) {
debugPrint('StreamRecoveryService: No recovery state for stream $streamId');
return null;
}
debugPrint('StreamRecoveryService: Attempting to recover stream $streamId');
debugPrint('StreamRecoveryService: Last received index: ${state.lastReceivedIndex}');
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// Create recovery request with continuation token
final recoveryData = {
...state.originalRequest,
'continue_from_index': state.lastReceivedIndex,
'recovery_mode': true,
'stream_id': streamId,
};
// Add any accumulated content to avoid duplication
if (state.accumulatedContent.isNotEmpty) {
recoveryData['accumulated_content'] = state.accumulatedContent;
}
debugPrint('StreamRecoveryService: Recovery attempt ${retryCount + 1}/$maxRetries');
// Make recovery request
final dio = Dio(BaseOptions(
baseUrl: state.baseUrl,
connectTimeout: const Duration(seconds: 30),
receiveTimeout: null, // No timeout for streaming
headers: state.headers,
));
final response = await dio.post(
state.endpoint,
data: recoveryData,
options: Options(
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
},
responseType: ResponseType.stream,
),
);
if (response.statusCode == 200) {
debugPrint('StreamRecoveryService: Successfully recovered stream $streamId');
// Create new stream from recovered response
final stream = _processRecoveredStream(
response.data.stream,
state,
streamId,
);
return stream;
}
} catch (e) {
debugPrint('StreamRecoveryService: Recovery attempt failed: $e');
retryCount++;
if (retryCount < maxRetries) {
await Future.delayed(retryDelay * retryCount);
}
}
}
debugPrint('StreamRecoveryService: Failed to recover stream $streamId after $maxRetries attempts');
return null;
}
// Process recovered stream and filter out duplicates
Stream<String> _processRecoveredStream(
Stream<List<int>> rawStream,
StreamRecoveryState state,
String streamId,
) {
final controller = StreamController<String>();
String buffer = '';
bool skipUntilNewContent = state.lastReceivedIndex > 0;
int currentIndex = 0;
rawStream.listen(
(chunk) {
final text = utf8.decode(chunk, allowMalformed: true);
buffer += text;
// Process complete SSE events
while (buffer.contains('\n')) {
final lineEnd = buffer.indexOf('\n');
final line = buffer.substring(0, lineEnd).trim();
buffer = buffer.substring(lineEnd + 1);
if (line.startsWith('data: ')) {
final data = line.substring(6);
if (data == '[DONE]') {
controller.close();
return;
}
// Parse JSON data
try {
final json = jsonDecode(data);
// Check if we should skip this content (already received)
if (skipUntilNewContent) {
currentIndex++;
if (currentIndex <= state.lastReceivedIndex) {
debugPrint('StreamRecoveryService: Skipping duplicate content at index $currentIndex');
continue;
}
skipUntilNewContent = false;
}
// Extract content from JSON
if (json['choices'] != null && json['choices'].isNotEmpty) {
final delta = json['choices'][0]['delta'];
if (delta != null && delta['content'] != null) {
final content = delta['content'] as String;
// Update recovery state
state.lastReceivedIndex = currentIndex;
state.accumulatedContent += content;
// Emit recovered content
controller.add(content);
currentIndex++;
}
}
} catch (e) {
debugPrint('StreamRecoveryService: Error parsing recovered data: $e');
}
}
}
},
onDone: () {
debugPrint('StreamRecoveryService: Recovered stream completed');
controller.close();
unregisterStream(streamId);
},
onError: (error) {
debugPrint('StreamRecoveryService: Recovered stream error: $error');
controller.addError(error);
// Attempt another recovery
Future.delayed(retryDelay, () async {
final recoveredStream = await recoverStream(streamId);
if (recoveredStream != null) {
recoveredStream.listen(
(data) => controller.add(data),
onDone: () => controller.close(),
onError: (e) => controller.addError(e),
);
} else {
controller.close();
}
});
},
);
return controller.stream;
}
// Update recovery state with new content
void updateStreamProgress(String streamId, String content, int index) {
final state = _recoveryStates[streamId];
if (state != null) {
state.lastReceivedIndex = index;
state.accumulatedContent += content;
}
}
// Clear recovery state for a stream
void clearStreamState(String streamId) {
_recoveryStates.remove(streamId);
}
}
// Recovery state for a stream
class StreamRecoveryState {
final String baseUrl;
final String endpoint;
final Map<String, dynamic> originalRequest;
final Map<String, String> headers;
int lastReceivedIndex;
String accumulatedContent;
DateTime lastActivity;
StreamRecoveryState({
required this.baseUrl,
required this.endpoint,
required this.originalRequest,
required this.headers,
this.lastReceivedIndex = 0,
this.accumulatedContent = '',
}) : lastActivity = DateTime.now();
// Check if stream is stale (no activity for too long)
bool get isStale {
return DateTime.now().difference(lastActivity).inMinutes > 5;
}
// Update activity timestamp
void updateActivity() {
lastActivity = DateTime.now();
}
}