diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 634c6f9..34e9152 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -17,6 +17,7 @@ import '../error/api_error_interceptor.dart'; // Tool-call details are parsed in the UI layer to render collapsible blocks import 'persistent_streaming_service.dart'; import 'connectivity_service.dart'; +import 'sse_stream_parser.dart'; import '../utils/debug_logger.dart'; import '../utils/openwebui_source_parser.dart'; @@ -3066,7 +3067,11 @@ class ApiService { final Map _streamCancelTokens = {}; final Map _messagePersistentStreamIds = {}; - // Send message using the background flow (socket push + polling fallback). + // Send message using dual-stream approach (HTTP SSE + WebSocket events). + // Matches OpenWebUI web client behavior: + // - HTTP SSE stream provides immediate content chunks + // - WebSocket events deliver metadata, tool status, sources, follow-ups + // - Both streams run in parallel for reliability // Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow) ({ Stream stream, @@ -3219,7 +3224,7 @@ class ApiService { _traceApi('Including non-image files in request: ${allFiles.length}'); } - _traceApi('Preparing chat send request (backgroundFlow: true)'); + _traceApi('Preparing dual-stream chat request (HTTP SSE + WebSocket)'); _traceApi('Model: $model'); _traceApi('Message count: ${processedMessages.length}'); @@ -3232,10 +3237,10 @@ class ApiService { _traceApi('id value: ${data['id']}'); _traceApi( - 'Forcing background flow (hasBackgroundTasks: ' - '$hasBackgroundTasksPayload, tools: ${toolIds?.isNotEmpty == true}, ' - 'webSearch: $enableWebSearch, imageGen: $enableImageGeneration, ' - 'sessionOverride: ${sessionIdOverride != null && sessionIdOverride.isNotEmpty})', + 'Request features: hasBackgroundTasks=$hasBackgroundTasksPayload, ' + 'tools=${toolIds?.isNotEmpty == true}, ' + 'webSearch=$enableWebSearch, imageGen=$enableImageGeneration, ' + 'toolServers=${toolServers?.isNotEmpty == true}', ); // Attach identifiers to trigger background task processing on the server @@ -3259,27 +3264,89 @@ class ApiService { ); _traceApi('Has background_tasks: ${data.containsKey('background_tasks')}'); - _traceApi('Initiating background tools flow (task-based)'); + _traceApi('Initiating dual-stream request (HTTP SSE + WebSocket)'); _traceApi('Posting to /api/chat/completions'); - // Fire in background; all updates will come via WebSocket events + // Create a cancel token for this request + final cancelToken = CancelToken(); + _streamCancelTokens[messageId] = cancelToken; + + // Start HTTP SSE stream (matches web client behavior) + // The WebSocket events will run in parallel via streaming_helper.dart () async { try { - final resp = await _dio.post('/api/chat/completions', data: data); - final respData = resp.data; - final taskId = (respData is Map) - ? (respData['task_id']?.toString()) - : null; - _traceApi('Background task created: $taskId'); + final resp = await _dio.post( + '/api/chat/completions', + data: data, + options: Options( + responseType: ResponseType.stream, + headers: { + 'Accept': 'text/event-stream', + }, + ), + cancelToken: cancelToken, + ); - // Close the controller immediately - all streaming will happen via WebSocket - // No polling fallback to avoid duplication issues + final respData = resp.data; + + // Check if we got a task_id response (non-streaming) + if (respData is Map && respData['task_id'] != null) { + final taskId = respData['task_id'].toString(); + _traceApi('Background task created: $taskId'); + + // In this case, all streaming will happen via WebSocket + // Close HTTP stream but keep WebSocket active + if (!streamController.isClosed) { + streamController.close(); + } + return; + } + + // We have a streaming response body + if (respData is ResponseBody) { + _traceApi('HTTP SSE stream started for message: $messageId'); + + // Parse SSE stream and forward chunks to controller + await for (final chunk in SSEStreamParser.parseResponseStream( + respData, + splitLargeDeltas: false, + )) { + if (!streamController.isClosed) { + streamController.add(chunk); + } else { + _traceApi('Stream controller closed, stopping SSE parsing'); + break; + } + } + + _traceApi('HTTP SSE stream completed for message: $messageId'); + } else { + _traceApi('Unexpected response type: ${respData.runtimeType}'); + } + + // Close the HTTP stream controller + // WebSocket events will continue independently via streaming_helper if (!streamController.isClosed) { streamController.close(); } + } on DioException catch (e) { + if (CancelToken.isCancel(e)) { + _traceApi('HTTP stream cancelled for message: $messageId'); + } else { + _traceApi('HTTP stream error: $e'); + if (!streamController.isClosed) { + streamController.addError(e); + streamController.close(); + } + } } catch (e) { - _traceApi('Background tools flow failed: $e'); - if (!streamController.isClosed) streamController.close(); + _traceApi('Unexpected error in HTTP stream: $e'); + if (!streamController.isClosed) { + streamController.addError(e); + streamController.close(); + } + } finally { + _streamCancelTokens.remove(messageId); } }(); diff --git a/lib/core/services/sse_stream_parser.dart b/lib/core/services/sse_stream_parser.dart new file mode 100644 index 0000000..4a16b67 --- /dev/null +++ b/lib/core/services/sse_stream_parser.dart @@ -0,0 +1,172 @@ +import 'dart:async'; +import 'dart:convert'; +import 'package:dio/dio.dart'; +import '../utils/debug_logger.dart'; + +/// Parser for Server-Sent Events (SSE) streaming responses. +/// +/// This matches the web client's EventSourceParserStream behavior, +/// parsing SSE data chunks and extracting OpenAI-compatible deltas. +class SSEStreamParser { + /// Parse an SSE response stream from Dio into text chunks. + /// + /// Returns a stream of content strings extracted from OpenAI-style + /// completion chunks. + static Stream parseResponseStream( + ResponseBody responseBody, { + bool splitLargeDeltas = false, + }) async* { + try { + // Buffer for accumulating incomplete SSE messages + String buffer = ''; + + await for (final chunk in responseBody.stream) { + // Convert bytes to string (Dio ResponseBody.stream always emits Uint8List) + final text = utf8.decode(chunk as List, allowMalformed: true); + buffer += text; + + // Process complete SSE messages (delimited by double newline) + final messages = buffer.split('\n\n'); + + // Keep the last (potentially incomplete) message in the buffer + buffer = messages.removeLast(); + + for (final message in messages) { + if (message.trim().isEmpty) continue; + + // Parse SSE message + final content = _parseSSEMessage(message); + if (content != null) { + if (content == '[DONE]') { + // Stream completion signal + DebugLogger.stream('SSE stream completed with [DONE] signal'); + return; + } + + // Split large deltas into smaller chunks for smoother UI updates + if (splitLargeDeltas && content.length > 5) { + yield* _splitIntoChunks(content); + } else { + yield content; + } + } + } + } + + // Process any remaining buffered data + if (buffer.trim().isNotEmpty) { + final content = _parseSSEMessage(buffer); + if (content != null && content != '[DONE]') { + yield content; + } + } + } catch (e, stackTrace) { + DebugLogger.error( + 'sse-parse-error', + scope: 'streaming/sse', + error: e, + stackTrace: stackTrace, + ); + rethrow; + } + } + + /// Parse a single SSE message and extract content. + static String? _parseSSEMessage(String message) { + try { + // SSE format: "data: \n" or just the JSON + String dataLine = message.trim(); + + // Remove "data: " prefix if present + if (dataLine.startsWith('data: ')) { + dataLine = dataLine.substring(6).trim(); + } else if (dataLine.startsWith('data:')) { + dataLine = dataLine.substring(5).trim(); + } + + // Handle [DONE] signal + if (dataLine == '[DONE]' || dataLine == 'DONE') { + return '[DONE]'; + } + + // Skip empty data + if (dataLine.isEmpty) { + return null; + } + + // Parse JSON + try { + final json = jsonDecode(dataLine) as Map; + + // Handle errors + if (json['error'] != null) { + DebugLogger.error( + 'sse-error-response', + scope: 'streaming/sse', + error: json['error'], + ); + return null; + } + + // Extract content from OpenAI-style response + // Format: { choices: [{ delta: { content: "..." } }] } + final choices = json['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first as Map?; + if (choice != null) { + final delta = choice['delta'] as Map?; + if (delta != null) { + final content = delta['content']; + if (content is String && content.isNotEmpty) { + return content; + } + } + } + } + + // Alternative format: { content: "..." } + final directContent = json['content']; + if (directContent is String && directContent.isNotEmpty) { + return directContent; + } + + return null; + } on FormatException catch (e) { + DebugLogger.warning( + 'Failed to parse SSE JSON: $dataLine', + data: {'error': e.toString()}, + ); + return null; + } + } catch (e) { + DebugLogger.error( + 'sse-message-parse-error', + scope: 'streaming/sse', + error: e, + data: {'message': message}, + ); + return null; + } + } + + /// Split large content into smaller chunks for smoother streaming. + /// This matches the web client's streamLargeDeltasAsRandomChunks behavior. + static Stream _splitIntoChunks(String content) async* { + var remaining = content; + + while (remaining.isNotEmpty) { + // Random chunk size between 1-3 characters + final chunkSize = (remaining.length < 3) + ? remaining.length + : 1 + (DateTime.now().millisecond % 3); + + final chunk = remaining.substring(0, chunkSize); + yield chunk; + + // Small delay for smoother visual effect (matching web client) + await Future.delayed(const Duration(milliseconds: 5)); + + remaining = remaining.substring(chunkSize); + } + } +}