From d3742944bce2604269d7c0e17c56126a3666d7ec Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Tue, 19 Aug 2025 13:35:32 +0530 Subject: [PATCH] chore: formatting --- lib/core/services/api_service.dart | 535 ++++++++++++++++------------- 1 file changed, 299 insertions(+), 236 deletions(-) diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 5ca0672..e68930c 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -47,7 +47,7 @@ class ApiService { ) { // Use API key from server config if provided and no explicit auth token final effectiveAuthToken = authToken ?? serverConfig.apiKey; - + // Initialize the consistent auth interceptor _authInterceptor = ApiAuthInterceptor( authToken: effectiveAuthToken, @@ -90,15 +90,19 @@ class ApiService { debugPrint('Method: ${options.method}'); debugPrint('Headers: ${options.headers}'); debugPrint('Content-Type: ${options.contentType}'); - + // Log the raw data being sent if (options.data != null) { if (options.data is Map) { final dataMap = options.data as Map; debugPrint('Data type: Map'); debugPrint('Data keys: ${dataMap.keys.toList()}'); - debugPrint('Has background_tasks: ${dataMap.containsKey('background_tasks')}'); - debugPrint('Has session_id: ${dataMap.containsKey('session_id')}'); + debugPrint( + 'Has background_tasks: ${dataMap.containsKey('background_tasks')}', + ); + debugPrint( + 'Has session_id: ${dataMap.containsKey('session_id')}', + ); debugPrint('Has id: ${dataMap.containsKey('id')}'); debugPrint('Full data: ${jsonEncode(dataMap)}'); } else { @@ -112,7 +116,7 @@ class ApiService { }, ), ); - + // 5. Standard logging interceptor _dio.interceptors.add( LogInterceptor( @@ -324,56 +328,63 @@ class ApiService { debugPrint('DEBUG: Auth token present: ${authToken != null}'); List allRegularChats = []; - + if (limit == null) { // Fetch all conversations using pagination debugPrint('DEBUG: Fetching ALL conversations using pagination'); int currentPage = 0; - + while (true) { debugPrint('DEBUG: Fetching page $currentPage'); final response = await _dio.get( '/api/v1/chats/', queryParameters: {'page': currentPage}, ); - + if (response.data is! List) { - throw Exception('Expected array of chats, got ${response.data.runtimeType}'); + throw Exception( + 'Expected array of chats, got ${response.data.runtimeType}', + ); } - + final pageChats = response.data as List; - debugPrint('DEBUG: Page $currentPage returned ${pageChats.length} conversations'); - + debugPrint( + 'DEBUG: Page $currentPage returned ${pageChats.length} conversations', + ); + if (pageChats.isEmpty) { debugPrint('DEBUG: No more conversations, stopping pagination'); break; } - + allRegularChats.addAll(pageChats); currentPage++; - + // Safety break to avoid infinite loops (adjust as needed) if (currentPage > 100) { - debugPrint('WARNING: Reached maximum page limit (100), stopping pagination'); + debugPrint( + 'WARNING: Reached maximum page limit (100), stopping pagination', + ); break; } } - - debugPrint('DEBUG: Fetched total of ${allRegularChats.length} conversations across $currentPage pages'); + + debugPrint( + 'DEBUG: Fetched total of ${allRegularChats.length} conversations across $currentPage pages', + ); } else { // Original single page fetch final regularResponse = await _dio.get( '/api/v1/chats/', - queryParameters: { - if (limit > 0) - 'page': ((skip ?? 0) / limit).floor(), - }, + queryParameters: {if (limit > 0) 'page': ((skip ?? 0) / limit).floor()}, ); - + if (regularResponse.data is! List) { - throw Exception('Expected array of chats, got ${regularResponse.data.runtimeType}'); + throw Exception( + 'Expected array of chats, got ${regularResponse.data.runtimeType}', + ); } - + allRegularChats = regularResponse.data as List; } @@ -381,7 +392,9 @@ class ApiService { final archivedResponse = await _dio.get('/api/v1/chats/all/archived'); debugPrint('DEBUG: Pinned response status: ${pinnedResponse.statusCode}'); - debugPrint('DEBUG: Archived response status: ${archivedResponse.statusCode}'); + debugPrint( + 'DEBUG: Archived response status: ${archivedResponse.statusCode}', + ); if (pinnedResponse.data is! List) { throw Exception( @@ -438,16 +451,23 @@ class ApiService { for (final chatData in regularChatList) { try { // Debug: Check if conversation has folder_id in raw data - if (chatData.containsKey('folder_id') && chatData['folder_id'] != null) { - debugPrint('🔍 DEBUG: Found conversation with folder_id in raw data: ${chatData['id']} -> ${chatData['folder_id']}'); + if (chatData.containsKey('folder_id') && + chatData['folder_id'] != null) { + debugPrint( + '🔍 DEBUG: Found conversation with folder_id in raw data: ${chatData['id']} -> ${chatData['folder_id']}', + ); } - + // Debug: Check what fields are available in the chat data if (regularChatList.indexOf(chatData) == 0) { - debugPrint('🔍 DEBUG: Sample chat data fields: ${chatData.keys.toList()}'); - debugPrint('🔍 DEBUG: Sample chat data: ${chatData.toString().substring(0, 200)}...'); + debugPrint( + '🔍 DEBUG: Sample chat data fields: ${chatData.keys.toList()}', + ); + debugPrint( + '🔍 DEBUG: Sample chat data: ${chatData.toString().substring(0, 200)}...', + ); } - + final conversation = _parseOpenWebUIChat(chatData); // Only add if not already added as pinned or archived if (!pinnedIds.contains(conversation.id) && @@ -521,10 +541,12 @@ class ApiService { final archived = chatData['archived'] as bool? ?? false; final shareId = chatData['share_id'] as String?; final folderId = chatData['folder_id'] as String?; - + // Debug logging for folder assignment if (folderId != null) { - debugPrint('🔍 DEBUG: Conversation ${id.substring(0, 8)} has folderId: $folderId'); + debugPrint( + '🔍 DEBUG: Conversation ${id.substring(0, 8)} has folderId: $folderId', + ); } debugPrint( @@ -561,10 +583,10 @@ class ApiService { Conversation _parseFullOpenWebUIChat(Map chatData) { debugPrint('DEBUG: Parsing full OpenWebUI chat data'); debugPrint('DEBUG: Chat data keys: ${chatData.keys.toList()}'); - + final id = chatData['id'] as String; final title = chatData['title'] as String; - + debugPrint('DEBUG: Parsed chat ID: $id'); debugPrint('DEBUG: Parsed chat title: $title'); @@ -581,7 +603,7 @@ class ApiService { // Parse messages from the 'chat' object or top-level messages final chatObject = chatData['chat'] as Map?; final messages = []; - + // Extract model from chat.models array String? model; if (chatObject != null && chatObject['models'] != null) { @@ -610,11 +632,13 @@ class ApiService { debugPrint( 'DEBUG: Found ${messagesMap.length} messages in chat.history.messages (converting to list)', ); - + // Convert map to list format to use common parsing logic messagesList = []; for (final entry in messagesMap.entries) { - final msgData = Map.from(entry.value as Map); + final msgData = Map.from( + entry.value as Map, + ); msgData['id'] = entry.key; // Use the key as the message ID messagesList.add(msgData); } @@ -704,7 +728,7 @@ class ApiService { .where((file) => file is Map && file['file_id'] != null) .map((file) => file['file_id'] as String) .toList(); - + if (attachmentIds.isEmpty) { attachmentIds = null; } @@ -738,7 +762,7 @@ class ApiService { for (final msg in messages) { final messageId = msg.id; - + // Build message for history.messages map messagesMap[messageId] = { 'id': messageId, @@ -749,12 +773,12 @@ class ApiService { 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, if (msg.role == 'user' && model != null) 'models': [model], }; - + // Update parent's childrenIds if there's a previous message if (previousId != null && messagesMap.containsKey(previousId)) { (messagesMap[previousId]['childrenIds'] as List).add(messageId); } - + // Build message for messages array messagesArray.add({ 'id': messageId, @@ -765,7 +789,7 @@ class ApiService { 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, if (msg.role == 'user' && model != null) 'models': [model], }); - + previousId = messageId; currentId = messageId; } @@ -793,7 +817,9 @@ class ApiService { final response = await _dio.post('/api/v1/chats/new', data: chatData); - debugPrint('DEBUG: Create conversation response status: ${response.statusCode}'); + debugPrint( + 'DEBUG: Create conversation response status: ${response.statusCode}', + ); debugPrint('DEBUG: Create conversation response data: ${response.data}'); // Parse the response @@ -821,7 +847,7 @@ class ApiService { for (final msg in messages) { final messageId = msg.id; - + // Build message for messages map (history.messages) messagesMap[messageId] = { 'id': messageId, @@ -831,19 +857,20 @@ class ApiService { 'content': msg.content, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, if (msg.role == 'assistant' && msg.model != null) 'model': msg.model, - if (msg.role == 'assistant' && msg.model != null) 'modelName': msg.model, + if (msg.role == 'assistant' && msg.model != null) + 'modelName': msg.model, if (msg.role == 'assistant') 'modelIdx': 0, if (msg.role == 'assistant') 'done': true, if (msg.role == 'user' && model != null) 'models': [model], - if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) + if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) 'files': msg.attachmentIds!.map((id) => {'file_id': id}).toList(), }; - + // Update parent's childrenIds if (previousId != null && messagesMap.containsKey(previousId)) { (messagesMap[previousId]['childrenIds'] as List).add(messageId); } - + // Build message for messages array messagesArray.add({ 'id': messageId, @@ -853,14 +880,15 @@ class ApiService { 'content': msg.content, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, if (msg.role == 'assistant' && msg.model != null) 'model': msg.model, - if (msg.role == 'assistant' && msg.model != null) 'modelName': msg.model, + if (msg.role == 'assistant' && msg.model != null) + 'modelName': msg.model, if (msg.role == 'assistant') 'modelIdx': 0, if (msg.role == 'assistant') 'done': true, if (msg.role == 'user' && model != null) 'models': [model], - if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) + if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) 'files': msg.attachmentIds!.map((id) => {'file_id': id}).toList(), }); - + previousId = messageId; currentId = messageId; } @@ -868,7 +896,7 @@ class ApiService { // Create the chat data structure matching OpenWebUI format exactly final chatData = { 'chat': { - if (title != null) 'title': title, // Include the title if provided + if (title != null) 'title': title, // Include the title if provided 'models': model != null ? [model] : [], 'messages': messagesArray, 'history': { @@ -950,8 +978,6 @@ class ApiService { await _dio.post('/api/v1/users/user/settings', data: settings); } - - // Suggestions Future> getSuggestions() async { debugPrint('DEBUG: Fetching conversation suggestions'); @@ -985,7 +1011,7 @@ class ApiService { final response = await _dio.get('/api/v1/folders/'); debugPrint('DEBUG: Folders response status: ${response.statusCode}'); debugPrint('DEBUG: Folders response data: ${response.data}'); - + final data = response.data; if (data is List) { debugPrint('DEBUG: Found ${data.length} folders'); @@ -1496,12 +1522,14 @@ class ApiService { Map? modelItem, String? sessionId, }) async { - debugPrint('DEBUG: Sending chat completed notification (optional endpoint)'); - + debugPrint( + 'DEBUG: Sending chat completed notification (optional endpoint)', + ); + // This endpoint appears to be optional or deprecated in newer OpenWebUI versions // The main chat synchronization happens through /api/v1/chats/{id} updates // We'll still try to call it but won't fail if it doesn't work - + // Format messages to match OpenWebUI expected structure // Note: Removing 'id' field as it causes 400 error final formattedMessages = messages.map((msg) { @@ -1509,9 +1537,10 @@ class ApiService { // Don't include 'id' - it causes 400 error with detail: 'id' 'role': msg['role'], 'content': msg['content'], - 'timestamp': msg['timestamp'] ?? DateTime.now().millisecondsSinceEpoch ~/ 1000, + 'timestamp': + msg['timestamp'] ?? DateTime.now().millisecondsSinceEpoch ~/ 1000, }; - + // Add model info for assistant messages if (msg['role'] == 'assistant') { formatted['model'] = model; @@ -1519,17 +1548,18 @@ class ApiService { formatted['usage'] = msg['usage']; } } - + return formatted; }).toList(); // Include the message ID and session ID at the top level - server expects these final requestData = { - 'id': messageId, // The server expects the assistant message ID here + 'id': messageId, // The server expects the assistant message ID here 'chat_id': chatId, 'model': model, 'messages': formattedMessages, - 'session_id': sessionId ?? const Uuid().v4().substring(0, 20), // Add session_id + 'session_id': + sessionId ?? const Uuid().v4().substring(0, 20), // Add session_id // Don't include model_item as it might not be expected }; @@ -1541,7 +1571,9 @@ class ApiService { debugPrint('DEBUG: Chat completed response: ${response.statusCode}'); } catch (e) { // This is a non-critical endpoint - main sync happens via /api/v1/chats/{id} - debugPrint('DEBUG: Chat completed endpoint not available or failed (non-critical): $e'); + debugPrint( + 'DEBUG: Chat completed endpoint not available or failed (non-critical): $e', + ); } } @@ -2379,8 +2411,7 @@ class ApiService { // Send message with SSE streaming // Returns a record with (stream, messageId, sessionId) - ({Stream stream, String messageId, String sessionId}) - sendMessage({ + ({Stream stream, String messageId, String sessionId}) sendMessage({ required List> messages, required String model, String? conversationId, @@ -2450,12 +2481,12 @@ class ApiService { 'model': model, 'messages': processedMessages, }; - + // Add only essential parameters if (conversationId != null) { data['chat_id'] = conversationId; } - + // Add web search flag if enabled if (enableWebSearch) { data['web_search'] = true; @@ -2468,7 +2499,7 @@ class ApiService { }; debugPrint('DEBUG: Web search enabled in SSE request'); } - + // Don't add session_id or id - they break SSE streaming! // The server falls back to task-based async when these are present @@ -2478,12 +2509,14 @@ class ApiService { // Debug the data being sent debugPrint('DEBUG: SSE request data keys: ${data.keys.toList()}'); - debugPrint('DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}'); + debugPrint( + 'DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}', + ); debugPrint('DEBUG: Has session_id: ${data.containsKey('session_id')}'); debugPrint('DEBUG: background_tasks value: ${data['background_tasks']}'); debugPrint('DEBUG: session_id value: ${data['session_id']}'); debugPrint('DEBUG: id value: ${data['id']}'); - + // Use SSE streaming with proper parser _streamSSE(data, streamController, messageId); @@ -2503,11 +2536,11 @@ class ApiService { final persistentService = PersistentStreamingService(); final recoveryService = StreamRecoveryService(); final streamId = DateTime.now().millisecondsSinceEpoch.toString(); - + // Extract metadata for recovery final conversationId = data['conversation_id'] ?? data['chat_id'] ?? ''; final sessionId = data['session_id'] ?? const Uuid().v4().substring(0, 20); - + // Register stream for recovery recoveryService.registerStream( streamId, @@ -2523,40 +2556,46 @@ class ApiService { }, ), ); - + // Recovery callback for persistent service Future recoveryCallback() async { debugPrint('Persistent: Attempting to recover stream $streamId'); // Restart the streaming request _streamSSE(data, streamController, messageId); } - + // Declare variables that need to be accessible in catch block String? persistentStreamId; - + try { - debugPrint('DEBUG: Making SSE request with parser to /api/chat/completions'); - + debugPrint( + 'DEBUG: Making SSE request with parser to /api/chat/completions', + ); + // Create a fresh Dio instance optimized for SSE streaming - final streamDio = Dio(BaseOptions( - baseUrl: serverConfig.url, - connectTimeout: const Duration(seconds: 60), // Longer for initial connection - receiveTimeout: null, // No timeout for streaming - sendTimeout: const Duration(seconds: 30), - headers: { - 'Authorization': 'Bearer ${_authInterceptor.authToken}', - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - ...serverConfig.customHeaders, // Include any custom headers - }, - validateStatus: (status) => status != null && status < 400, - followRedirects: true, - maxRedirects: 3, - )); - + final streamDio = Dio( + BaseOptions( + baseUrl: serverConfig.url, + connectTimeout: const Duration( + seconds: 60, + ), // Longer for initial connection + receiveTimeout: null, // No timeout for streaming + sendTimeout: const Duration(seconds: 30), + headers: { + 'Authorization': 'Bearer ${_authInterceptor.authToken}', + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + ...serverConfig.customHeaders, // Include any custom headers + }, + validateStatus: (status) => status != null && status < 400, + followRedirects: true, + maxRedirects: 3, + ), + ); + debugPrint('DEBUG: Sending SSE request with data: ${jsonEncode(data)}'); - + final response = await streamDio.post( '/api/chat/completions', data: data, // Pass data directly as Map @@ -2568,37 +2607,43 @@ class ApiService { debugPrint('DEBUG: SSE response status: ${response.statusCode}'); debugPrint('DEBUG: SSE response headers: ${response.headers}'); - debugPrint('DEBUG: SSE content-type: ${response.headers.value('content-type')}'); + debugPrint( + 'DEBUG: SSE content-type: ${response.headers.value('content-type')}', + ); if (response.statusCode != 200) { - throw Exception('HTTP ${response.statusCode}: Failed to start streaming'); + throw Exception( + 'HTTP ${response.statusCode}: Failed to start streaming', + ); } // Check if we got SSE or JSON response final contentType = response.headers.value('content-type') ?? ''; if (!contentType.contains('text/event-stream')) { debugPrint('WARNING: Expected SSE but got content-type: $contentType'); - debugPrint('WARNING: This usually means the server didn\'t receive the streaming parameters'); - + debugPrint( + 'WARNING: This usually means the server didn\'t receive the streaming parameters', + ); + // Try to read the response to see what we got final stream = response.data.stream as Stream>; final bytes = await stream.toList(); final fullBytes = bytes.expand((x) => x).toList(); final responseText = utf8.decode(fullBytes); debugPrint('DEBUG: Non-SSE response length: ${responseText.length}'); - + // If it's JSON, parse and handle it if (contentType.contains('application/json')) { try { final json = jsonDecode(responseText); - + // Check if it's an error if (json is Map && json.containsKey('error')) { debugPrint('ERROR: Server returned error: ${json['error']}'); streamController.addError('Server error: ${json['error']}'); return; } - + // Try to extract content from non-streaming response if (json is Map && json.containsKey('choices')) { final choices = json['choices'] as List?; @@ -2608,7 +2653,9 @@ class ApiService { final message = choice['message'] as Map; final content = message['content']?.toString() ?? ''; if (content.isNotEmpty) { - debugPrint('DEBUG: Successfully extracted content from JSON response'); + debugPrint( + 'DEBUG: Successfully extracted content from JSON response', + ); // Stream the content word by word for better UX final words = content.split(' '); for (final word in words) { @@ -2619,15 +2666,17 @@ class ApiService { } } } - + // Log what we got if we couldn't extract content if (!streamController.isClosed) { debugPrint('DEBUG: JSON response structure: ${json.keys}'); debugPrint('DEBUG: Full JSON response: $json'); - + // Check if it's a task-based response if (json is Map && json.containsKey('task_id')) { - debugPrint('DEBUG: Got task-based response with task_id: ${json['task_id']}'); + debugPrint( + 'DEBUG: Got task-based response with task_id: ${json['task_id']}', + ); debugPrint('DEBUG: Status: ${json['status']}'); // This might be a polling-based async pattern // TODO: Implement polling for task completion @@ -2636,7 +2685,9 @@ class ApiService { } catch (e) { debugPrint('ERROR: Failed to parse JSON response: $e'); // Try to show something to the user - streamController.add('Response received but could not be parsed properly.'); + streamController.add( + 'Response received but could not be parsed properly.', + ); } } else { // Not JSON, might be plain text @@ -2645,14 +2696,14 @@ class ApiService { streamController.add(responseText); } } - + streamController.close(); return; } // Parse SSE stream using enhanced parser with heartbeat monitoring final rawStream = response.data.stream; - + // Handle the stream properly based on its actual type Stream> byteStream; if (rawStream is Stream) { @@ -2660,38 +2711,42 @@ class ApiService { } else { byteStream = rawStream as Stream>; } - + // Parse SSE events with enhanced parser (includes heartbeat monitoring) - final sseParser = SSEParser(heartbeatTimeout: const Duration(seconds: 45)); + final sseParser = SSEParser( + heartbeatTimeout: const Duration(seconds: 45), + ); int contentIndex = 0; int chunkSequence = 0; String accumulatedContent = ''; - + // Monitor parser heartbeat for reconnection sseParser.heartbeat.listen((_) { debugPrint('Persistent: SSE heartbeat timeout detected'); }); - + sseParser.reconnectRequests.listen((lastEventId) { - debugPrint('Persistent: SSE reconnection requested, lastEventId: $lastEventId'); + debugPrint( + 'Persistent: SSE reconnection requested, lastEventId: $lastEventId', + ); // The persistent service will handle the reconnection }); - + // Convert bytes to SSE events final sseEventStream = SSEParser.parseStream( byteStream, heartbeatTimeout: const Duration(seconds: 45), ); - + // Listen to the SSE event stream final streamSubscription = sseEventStream.listen( (event) { try { chunkSequence++; - + // Update parser with chunk data for heartbeat monitoring sseParser.feed(''); // Reset heartbeat timer - + // Process the event data if (persistentStreamId != null) { _processSseEvent( @@ -2703,10 +2758,13 @@ class ApiService { persistentStreamId, ); } - + // Update recovery state - recoveryService.updateStreamProgress(streamId, event.data, contentIndex++); - + recoveryService.updateStreamProgress( + streamId, + event.data, + contentIndex++, + ); } catch (e) { debugPrint('Persistent: Error processing SSE event: $e'); streamController.addError(e); @@ -2724,10 +2782,10 @@ class ApiService { }, onError: (error) async { debugPrint('Persistent: SSE stream error: $error'); - + // Try recovery through recovery service first final recoveredStream = await recoveryService.recoverStream(streamId); - + if (recoveredStream != null) { debugPrint('Persistent: Successfully recovered SSE stream'); recoveredStream.listen( @@ -2757,9 +2815,10 @@ class ApiService { streamController.addError(error); } }, - cancelOnError: false, // Continue processing despite individual event errors + cancelOnError: + false, // Continue processing despite individual event errors ); - + // Register with persistent streaming service now that subscription is created persistentStreamId = persistentService.registerStream( subscription: streamSubscription, @@ -2775,14 +2834,13 @@ class ApiService { 'requestData': data, }, ); - } catch (e) { debugPrint('Persistent: Failed to create SSE stream: $e'); if (persistentStreamId != null) { persistentService.unregisterStream(persistentStreamId); } recoveryService.unregisterStream(streamId); - + if (e is DioException && e.response?.statusCode == 401) { // Auth error - don't retry streamController.addError('Authentication failed'); @@ -2792,7 +2850,7 @@ class ApiService { } } } - + /// Process individual SSE events with content extraction and progress tracking void _processSseEvent( SSEEvent event, @@ -2802,8 +2860,10 @@ class ApiService { PersistentStreamingService persistentService, String persistentStreamId, ) { - debugPrint('Persistent: SSE event - type: ${event.event}, data: ${event.data}'); - + debugPrint( + 'Persistent: SSE event - type: ${event.event}, data: ${event.data}', + ); + // Handle completion signal if (event.data == '[DONE]') { debugPrint('Persistent: SSE stream finished with [DONE]'); @@ -2812,10 +2872,10 @@ class ApiService { } return; } - + try { final json = jsonDecode(event.data) as Map; - + // Handle errors if (json.containsKey('error')) { final error = json['error']; @@ -2823,110 +2883,113 @@ class ApiService { streamController.addError('Server error: $error'); return; } - - // Handle content streaming - if (json.containsKey('choices')) { - final choices = json['choices'] as List?; - if (choices != null && choices.isNotEmpty) { - final choice = choices[0] as Map; - - if (choice.containsKey('delta')) { - final delta = choice['delta'] as Map; - - // Extract content - if (delta.containsKey('content')) { - final content = delta['content'] as String?; - if (content != null && content.isNotEmpty) { - debugPrint('Persistent: SSE content chunk: "$content"'); - - // Add content to stream - if (!streamController.isClosed) { - streamController.add(content); - } - - // Update persistent service progress - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - appendedContent: content, - ); - - accumulatedContent += content; - } - } - - // Check for completion in delta - if (delta.containsKey('finish_reason')) { - final finishReason = delta['finish_reason']; - debugPrint('Persistent: Stream finished with reason: $finishReason'); - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - } else if (choice.containsKey('finish_reason')) { - // Check for completion at choice level - final finishReason = choice['finish_reason']; - if (finishReason != null) { - debugPrint('Persistent: Stream finished with reason: $finishReason'); - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - } - } - } - - // Handle streaming chat/completions format variations - if (json.containsKey('delta')) { - final delta = json['delta'] as Map; - if (delta.containsKey('content')) { - final content = delta['content'] as String?; - if (content != null && content.isNotEmpty) { - debugPrint('Persistent: Direct delta content: "$content"'); - - if (!streamController.isClosed) { - streamController.add(content); - } - - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - appendedContent: content, - ); - - accumulatedContent += content; - } - } - } - - // Handle OpenRouter-style streaming - if (json.containsKey('message')) { - final message = json['message'] as Map; - if (message.containsKey('content')) { - final content = message['content'] as String?; - if (content != null && content.isNotEmpty) { - debugPrint('Persistent: Message content: "$content"'); - - if (!streamController.isClosed) { - streamController.add(content); - } - - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - content: content, // Full content, not appended - ); - } - } - } - - } catch (e) { - debugPrint('Persistent: Error parsing SSE event data: $e'); - // Don't fail the entire stream for one bad event + + // Handle content streaming + if (json.containsKey('choices')) { + final choices = json['choices'] as List?; + if (choices != null && choices.isNotEmpty) { + final choice = choices[0] as Map; + + if (choice.containsKey('delta')) { + final delta = choice['delta'] as Map; + + // Extract content + if (delta.containsKey('content')) { + final content = delta['content'] as String?; + if (content != null && content.isNotEmpty) { + debugPrint('Persistent: SSE content chunk: "$content"'); + + // Add content to stream + if (!streamController.isClosed) { + streamController.add(content); + } + + // Update persistent service progress + persistentService.updateStreamProgress( + persistentStreamId, + chunkSequence: chunkSequence, + appendedContent: content, + ); + + accumulatedContent += content; + } + } + + // Check for completion in delta + if (delta.containsKey('finish_reason')) { + final finishReason = delta['finish_reason']; + debugPrint( + 'Persistent: Stream finished with reason: $finishReason', + ); + if (!streamController.isClosed) { + streamController.close(); + } + return; + } + } else if (choice.containsKey('finish_reason')) { + // Check for completion at choice level + final finishReason = choice['finish_reason']; + if (finishReason != null) { + debugPrint( + 'Persistent: Stream finished with reason: $finishReason', + ); + if (!streamController.isClosed) { + streamController.close(); + } + return; + } + } + } } + + // Handle streaming chat/completions format variations + if (json.containsKey('delta')) { + final delta = json['delta'] as Map; + if (delta.containsKey('content')) { + final content = delta['content'] as String?; + if (content != null && content.isNotEmpty) { + debugPrint('Persistent: Direct delta content: "$content"'); + + if (!streamController.isClosed) { + streamController.add(content); + } + + persistentService.updateStreamProgress( + persistentStreamId, + chunkSequence: chunkSequence, + appendedContent: content, + ); + + accumulatedContent += content; + } + } + } + + // Handle OpenRouter-style streaming + if (json.containsKey('message')) { + final message = json['message'] as Map; + if (message.containsKey('content')) { + final content = message['content'] as String?; + if (content != null && content.isNotEmpty) { + debugPrint('Persistent: Message content: "$content"'); + + if (!streamController.isClosed) { + streamController.add(content); + } + + persistentService.updateStreamProgress( + persistentStreamId, + chunkSequence: chunkSequence, + content: content, // Full content, not appended + ); + } + } + } + } catch (e) { + debugPrint('Persistent: Error parsing SSE event data: $e'); + // Don't fail the entire stream for one bad event } + } // Legacy Socket.IO and older SSE methods removed