feat(sse): add SSEStreamParser to parse Dio SSE streams and extract

This commit is contained in:
cogwheel0
2025-10-26 22:33:53 +05:30
parent 551e844c03
commit ae48fcc035
2 changed files with 257 additions and 18 deletions

View File

@@ -17,6 +17,7 @@ import '../error/api_error_interceptor.dart';
// Tool-call details are parsed in the UI layer to render collapsible blocks
import 'persistent_streaming_service.dart';
import 'connectivity_service.dart';
import 'sse_stream_parser.dart';
import '../utils/debug_logger.dart';
import '../utils/openwebui_source_parser.dart';
@@ -3066,7 +3067,11 @@ class ApiService {
final Map<String, CancelToken> _streamCancelTokens = {};
final Map<String, String> _messagePersistentStreamIds = {};
// Send message using the background flow (socket push + polling fallback).
// Send message using dual-stream approach (HTTP SSE + WebSocket events).
// Matches OpenWebUI web client behavior:
// - HTTP SSE stream provides immediate content chunks
// - WebSocket events deliver metadata, tool status, sources, follow-ups
// - Both streams run in parallel for reliability
// Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow)
({
Stream<String> stream,
@@ -3219,7 +3224,7 @@ class ApiService {
_traceApi('Including non-image files in request: ${allFiles.length}');
}
_traceApi('Preparing chat send request (backgroundFlow: true)');
_traceApi('Preparing dual-stream chat request (HTTP SSE + WebSocket)');
_traceApi('Model: $model');
_traceApi('Message count: ${processedMessages.length}');
@@ -3232,10 +3237,10 @@ class ApiService {
_traceApi('id value: ${data['id']}');
_traceApi(
'Forcing background flow (hasBackgroundTasks: '
'$hasBackgroundTasksPayload, tools: ${toolIds?.isNotEmpty == true}, '
'webSearch: $enableWebSearch, imageGen: $enableImageGeneration, '
'sessionOverride: ${sessionIdOverride != null && sessionIdOverride.isNotEmpty})',
'Request features: hasBackgroundTasks=$hasBackgroundTasksPayload, '
'tools=${toolIds?.isNotEmpty == true}, '
'webSearch=$enableWebSearch, imageGen=$enableImageGeneration, '
'toolServers=${toolServers?.isNotEmpty == true}',
);
// Attach identifiers to trigger background task processing on the server
@@ -3259,27 +3264,89 @@ class ApiService {
);
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
_traceApi('Initiating background tools flow (task-based)');
_traceApi('Initiating dual-stream request (HTTP SSE + WebSocket)');
_traceApi('Posting to /api/chat/completions');
// Fire in background; all updates will come via WebSocket events
// Create a cancel token for this request
final cancelToken = CancelToken();
_streamCancelTokens[messageId] = cancelToken;
// Start HTTP SSE stream (matches web client behavior)
// The WebSocket events will run in parallel via streaming_helper.dart
() async {
try {
final resp = await _dio.post('/api/chat/completions', data: data);
final respData = resp.data;
final taskId = (respData is Map)
? (respData['task_id']?.toString())
: null;
_traceApi('Background task created: $taskId');
final resp = await _dio.post(
'/api/chat/completions',
data: data,
options: Options(
responseType: ResponseType.stream,
headers: {
'Accept': 'text/event-stream',
},
),
cancelToken: cancelToken,
);
// Close the controller immediately - all streaming will happen via WebSocket
// No polling fallback to avoid duplication issues
final respData = resp.data;
// Check if we got a task_id response (non-streaming)
if (respData is Map && respData['task_id'] != null) {
final taskId = respData['task_id'].toString();
_traceApi('Background task created: $taskId');
// In this case, all streaming will happen via WebSocket
// Close HTTP stream but keep WebSocket active
if (!streamController.isClosed) {
streamController.close();
}
return;
}
// We have a streaming response body
if (respData is ResponseBody) {
_traceApi('HTTP SSE stream started for message: $messageId');
// Parse SSE stream and forward chunks to controller
await for (final chunk in SSEStreamParser.parseResponseStream(
respData,
splitLargeDeltas: false,
)) {
if (!streamController.isClosed) {
streamController.add(chunk);
} else {
_traceApi('Stream controller closed, stopping SSE parsing');
break;
}
}
_traceApi('HTTP SSE stream completed for message: $messageId');
} else {
_traceApi('Unexpected response type: ${respData.runtimeType}');
}
// Close the HTTP stream controller
// WebSocket events will continue independently via streaming_helper
if (!streamController.isClosed) {
streamController.close();
}
} on DioException catch (e) {
if (CancelToken.isCancel(e)) {
_traceApi('HTTP stream cancelled for message: $messageId');
} else {
_traceApi('HTTP stream error: $e');
if (!streamController.isClosed) {
streamController.addError(e);
streamController.close();
}
}
} catch (e) {
_traceApi('Background tools flow failed: $e');
if (!streamController.isClosed) streamController.close();
_traceApi('Unexpected error in HTTP stream: $e');
if (!streamController.isClosed) {
streamController.addError(e);
streamController.close();
}
} finally {
_streamCancelTokens.remove(messageId);
}
}();