Merge pull request #110 from cogwheel0/feat-sse-stream-parser-and-function-calling-preference
feat-sse-stream-parser-and-function-calling-preference
This commit is contained in:
@@ -17,6 +17,7 @@ import '../error/api_error_interceptor.dart';
|
|||||||
// Tool-call details are parsed in the UI layer to render collapsible blocks
|
// Tool-call details are parsed in the UI layer to render collapsible blocks
|
||||||
import 'persistent_streaming_service.dart';
|
import 'persistent_streaming_service.dart';
|
||||||
import 'connectivity_service.dart';
|
import 'connectivity_service.dart';
|
||||||
|
import 'sse_stream_parser.dart';
|
||||||
import '../utils/debug_logger.dart';
|
import '../utils/debug_logger.dart';
|
||||||
import '../utils/openwebui_source_parser.dart';
|
import '../utils/openwebui_source_parser.dart';
|
||||||
|
|
||||||
@@ -3066,7 +3067,11 @@ class ApiService {
|
|||||||
final Map<String, CancelToken> _streamCancelTokens = {};
|
final Map<String, CancelToken> _streamCancelTokens = {};
|
||||||
final Map<String, String> _messagePersistentStreamIds = {};
|
final Map<String, String> _messagePersistentStreamIds = {};
|
||||||
|
|
||||||
// Send message using the background flow (socket push + polling fallback).
|
// Send message using dual-stream approach (HTTP SSE + WebSocket events).
|
||||||
|
// Matches OpenWebUI web client behavior:
|
||||||
|
// - HTTP SSE stream provides immediate content chunks
|
||||||
|
// - WebSocket events deliver metadata, tool status, sources, follow-ups
|
||||||
|
// - Both streams run in parallel for reliability
|
||||||
// Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow)
|
// Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow)
|
||||||
({
|
({
|
||||||
Stream<String> stream,
|
Stream<String> stream,
|
||||||
@@ -3088,6 +3093,7 @@ class ApiService {
|
|||||||
List<Map<String, dynamic>>? toolServers,
|
List<Map<String, dynamic>>? toolServers,
|
||||||
Map<String, dynamic>? backgroundTasks,
|
Map<String, dynamic>? backgroundTasks,
|
||||||
String? responseMessageId,
|
String? responseMessageId,
|
||||||
|
Map<String, dynamic>? userSettings,
|
||||||
}) {
|
}) {
|
||||||
final streamController = StreamController<String>();
|
final streamController = StreamController<String>();
|
||||||
|
|
||||||
@@ -3194,16 +3200,23 @@ class ApiService {
|
|||||||
data['tool_ids'] = toolIds;
|
data['tool_ids'] = toolIds;
|
||||||
_traceApi('Including tool_ids in streaming request: $toolIds');
|
_traceApi('Including tool_ids in streaming request: $toolIds');
|
||||||
|
|
||||||
// Hint server to use native function calling when tools are selected
|
// Respect user's function_calling preference from backend settings
|
||||||
// This enables provider-native tool execution paths and consistent UI events
|
// If not set, backend will default to 'default' mode (safer, more compatible)
|
||||||
try {
|
try {
|
||||||
|
final userParams = userSettings?['params'] as Map<String, dynamic>?;
|
||||||
|
final functionCallingMode = userParams?['function_calling'] as String?;
|
||||||
|
|
||||||
|
if (functionCallingMode != null) {
|
||||||
final params =
|
final params =
|
||||||
(data['params'] as Map<String, dynamic>?) ?? <String, dynamic>{};
|
(data['params'] as Map<String, dynamic>?) ?? <String, dynamic>{};
|
||||||
params['function_calling'] = 'native';
|
params['function_calling'] = functionCallingMode;
|
||||||
data['params'] = params;
|
data['params'] = params;
|
||||||
_traceApi('Set params.function_calling = native');
|
_traceApi('Set params.function_calling = $functionCallingMode (from user settings)');
|
||||||
|
} else {
|
||||||
|
_traceApi('No function_calling preference in user settings, backend will use default mode');
|
||||||
|
}
|
||||||
} catch (_) {
|
} catch (_) {
|
||||||
// Non-fatal; continue without forcing native mode
|
// Non-fatal; continue without setting function_calling mode
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3219,7 +3232,7 @@ class ApiService {
|
|||||||
_traceApi('Including non-image files in request: ${allFiles.length}');
|
_traceApi('Including non-image files in request: ${allFiles.length}');
|
||||||
}
|
}
|
||||||
|
|
||||||
_traceApi('Preparing chat send request (backgroundFlow: true)');
|
_traceApi('Preparing dual-stream chat request (HTTP SSE + WebSocket)');
|
||||||
_traceApi('Model: $model');
|
_traceApi('Model: $model');
|
||||||
_traceApi('Message count: ${processedMessages.length}');
|
_traceApi('Message count: ${processedMessages.length}');
|
||||||
|
|
||||||
@@ -3232,10 +3245,10 @@ class ApiService {
|
|||||||
_traceApi('id value: ${data['id']}');
|
_traceApi('id value: ${data['id']}');
|
||||||
|
|
||||||
_traceApi(
|
_traceApi(
|
||||||
'Forcing background flow (hasBackgroundTasks: '
|
'Request features: hasBackgroundTasks=$hasBackgroundTasksPayload, '
|
||||||
'$hasBackgroundTasksPayload, tools: ${toolIds?.isNotEmpty == true}, '
|
'tools=${toolIds?.isNotEmpty == true}, '
|
||||||
'webSearch: $enableWebSearch, imageGen: $enableImageGeneration, '
|
'webSearch=$enableWebSearch, imageGen=$enableImageGeneration, '
|
||||||
'sessionOverride: ${sessionIdOverride != null && sessionIdOverride.isNotEmpty})',
|
'toolServers=${toolServers?.isNotEmpty == true}',
|
||||||
);
|
);
|
||||||
|
|
||||||
// Attach identifiers to trigger background task processing on the server
|
// Attach identifiers to trigger background task processing on the server
|
||||||
@@ -3259,27 +3272,89 @@ class ApiService {
|
|||||||
);
|
);
|
||||||
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
|
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
|
||||||
|
|
||||||
_traceApi('Initiating background tools flow (task-based)');
|
_traceApi('Initiating dual-stream request (HTTP SSE + WebSocket)');
|
||||||
_traceApi('Posting to /api/chat/completions');
|
_traceApi('Posting to /api/chat/completions');
|
||||||
|
|
||||||
// Fire in background; all updates will come via WebSocket events
|
// Create a cancel token for this request
|
||||||
|
final cancelToken = CancelToken();
|
||||||
|
_streamCancelTokens[messageId] = cancelToken;
|
||||||
|
|
||||||
|
// Start HTTP SSE stream (matches web client behavior)
|
||||||
|
// The WebSocket events will run in parallel via streaming_helper.dart
|
||||||
() async {
|
() async {
|
||||||
try {
|
try {
|
||||||
final resp = await _dio.post('/api/chat/completions', data: data);
|
final resp = await _dio.post(
|
||||||
|
'/api/chat/completions',
|
||||||
|
data: data,
|
||||||
|
options: Options(
|
||||||
|
responseType: ResponseType.stream,
|
||||||
|
headers: {
|
||||||
|
'Accept': 'text/event-stream',
|
||||||
|
},
|
||||||
|
),
|
||||||
|
cancelToken: cancelToken,
|
||||||
|
);
|
||||||
|
|
||||||
final respData = resp.data;
|
final respData = resp.data;
|
||||||
final taskId = (respData is Map)
|
|
||||||
? (respData['task_id']?.toString())
|
// Check if we got a task_id response (non-streaming)
|
||||||
: null;
|
if (respData is Map && respData['task_id'] != null) {
|
||||||
|
final taskId = respData['task_id'].toString();
|
||||||
_traceApi('Background task created: $taskId');
|
_traceApi('Background task created: $taskId');
|
||||||
|
|
||||||
// Close the controller immediately - all streaming will happen via WebSocket
|
// In this case, all streaming will happen via WebSocket
|
||||||
// No polling fallback to avoid duplication issues
|
// Close HTTP stream but keep WebSocket active
|
||||||
if (!streamController.isClosed) {
|
if (!streamController.isClosed) {
|
||||||
streamController.close();
|
streamController.close();
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have a streaming response body
|
||||||
|
if (respData is ResponseBody) {
|
||||||
|
_traceApi('HTTP SSE stream started for message: $messageId');
|
||||||
|
|
||||||
|
// Parse SSE stream and forward chunks to controller
|
||||||
|
await for (final chunk in SSEStreamParser.parseResponseStream(
|
||||||
|
respData,
|
||||||
|
splitLargeDeltas: false,
|
||||||
|
)) {
|
||||||
|
if (!streamController.isClosed) {
|
||||||
|
streamController.add(chunk);
|
||||||
|
} else {
|
||||||
|
_traceApi('Stream controller closed, stopping SSE parsing');
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_traceApi('HTTP SSE stream completed for message: $messageId');
|
||||||
|
} else {
|
||||||
|
_traceApi('Unexpected response type: ${respData.runtimeType}');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the HTTP stream controller
|
||||||
|
// WebSocket events will continue independently via streaming_helper
|
||||||
|
if (!streamController.isClosed) {
|
||||||
|
streamController.close();
|
||||||
|
}
|
||||||
|
} on DioException catch (e) {
|
||||||
|
if (CancelToken.isCancel(e)) {
|
||||||
|
_traceApi('HTTP stream cancelled for message: $messageId');
|
||||||
|
} else {
|
||||||
|
_traceApi('HTTP stream error: $e');
|
||||||
|
if (!streamController.isClosed) {
|
||||||
|
streamController.addError(e);
|
||||||
|
streamController.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
_traceApi('Background tools flow failed: $e');
|
_traceApi('Unexpected error in HTTP stream: $e');
|
||||||
if (!streamController.isClosed) streamController.close();
|
if (!streamController.isClosed) {
|
||||||
|
streamController.addError(e);
|
||||||
|
streamController.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
_streamCancelTokens.remove(messageId);
|
||||||
}
|
}
|
||||||
}();
|
}();
|
||||||
|
|
||||||
|
|||||||
172
lib/core/services/sse_stream_parser.dart
Normal file
172
lib/core/services/sse_stream_parser.dart
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:convert';
|
||||||
|
import 'package:dio/dio.dart';
|
||||||
|
import '../utils/debug_logger.dart';
|
||||||
|
|
||||||
|
/// Parser for Server-Sent Events (SSE) streaming responses.
|
||||||
|
///
|
||||||
|
/// This matches the web client's EventSourceParserStream behavior,
|
||||||
|
/// parsing SSE data chunks and extracting OpenAI-compatible deltas.
|
||||||
|
class SSEStreamParser {
|
||||||
|
/// Parse an SSE response stream from Dio into text chunks.
|
||||||
|
///
|
||||||
|
/// Returns a stream of content strings extracted from OpenAI-style
|
||||||
|
/// completion chunks.
|
||||||
|
static Stream<String> parseResponseStream(
|
||||||
|
ResponseBody responseBody, {
|
||||||
|
bool splitLargeDeltas = false,
|
||||||
|
}) async* {
|
||||||
|
try {
|
||||||
|
// Buffer for accumulating incomplete SSE messages
|
||||||
|
String buffer = '';
|
||||||
|
|
||||||
|
await for (final chunk in responseBody.stream) {
|
||||||
|
// Convert bytes to string (Dio ResponseBody.stream always emits Uint8List)
|
||||||
|
final text = utf8.decode(chunk as List<int>, allowMalformed: true);
|
||||||
|
buffer += text;
|
||||||
|
|
||||||
|
// Process complete SSE messages (delimited by double newline)
|
||||||
|
final messages = buffer.split('\n\n');
|
||||||
|
|
||||||
|
// Keep the last (potentially incomplete) message in the buffer
|
||||||
|
buffer = messages.removeLast();
|
||||||
|
|
||||||
|
for (final message in messages) {
|
||||||
|
if (message.trim().isEmpty) continue;
|
||||||
|
|
||||||
|
// Parse SSE message
|
||||||
|
final content = _parseSSEMessage(message);
|
||||||
|
if (content != null) {
|
||||||
|
if (content == '[DONE]') {
|
||||||
|
// Stream completion signal
|
||||||
|
DebugLogger.stream('SSE stream completed with [DONE] signal');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Split large deltas into smaller chunks for smoother UI updates
|
||||||
|
if (splitLargeDeltas && content.length > 5) {
|
||||||
|
yield* _splitIntoChunks(content);
|
||||||
|
} else {
|
||||||
|
yield content;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process any remaining buffered data
|
||||||
|
if (buffer.trim().isNotEmpty) {
|
||||||
|
final content = _parseSSEMessage(buffer);
|
||||||
|
if (content != null && content != '[DONE]') {
|
||||||
|
yield content;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e, stackTrace) {
|
||||||
|
DebugLogger.error(
|
||||||
|
'sse-parse-error',
|
||||||
|
scope: 'streaming/sse',
|
||||||
|
error: e,
|
||||||
|
stackTrace: stackTrace,
|
||||||
|
);
|
||||||
|
rethrow;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse a single SSE message and extract content.
|
||||||
|
static String? _parseSSEMessage(String message) {
|
||||||
|
try {
|
||||||
|
// SSE format: "data: <json>\n" or just the JSON
|
||||||
|
String dataLine = message.trim();
|
||||||
|
|
||||||
|
// Remove "data: " prefix if present
|
||||||
|
if (dataLine.startsWith('data: ')) {
|
||||||
|
dataLine = dataLine.substring(6).trim();
|
||||||
|
} else if (dataLine.startsWith('data:')) {
|
||||||
|
dataLine = dataLine.substring(5).trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle [DONE] signal
|
||||||
|
if (dataLine == '[DONE]' || dataLine == 'DONE') {
|
||||||
|
return '[DONE]';
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip empty data
|
||||||
|
if (dataLine.isEmpty) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse JSON
|
||||||
|
try {
|
||||||
|
final json = jsonDecode(dataLine) as Map<String, dynamic>;
|
||||||
|
|
||||||
|
// Handle errors
|
||||||
|
if (json['error'] != null) {
|
||||||
|
DebugLogger.error(
|
||||||
|
'sse-error-response',
|
||||||
|
scope: 'streaming/sse',
|
||||||
|
error: json['error'],
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract content from OpenAI-style response
|
||||||
|
// Format: { choices: [{ delta: { content: "..." } }] }
|
||||||
|
final choices = json['choices'];
|
||||||
|
if (choices is List && choices.isNotEmpty) {
|
||||||
|
final choice = choices.first as Map<String, dynamic>?;
|
||||||
|
if (choice != null) {
|
||||||
|
final delta = choice['delta'] as Map<String, dynamic>?;
|
||||||
|
if (delta != null) {
|
||||||
|
final content = delta['content'];
|
||||||
|
if (content is String && content.isNotEmpty) {
|
||||||
|
return content;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alternative format: { content: "..." }
|
||||||
|
final directContent = json['content'];
|
||||||
|
if (directContent is String && directContent.isNotEmpty) {
|
||||||
|
return directContent;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
} on FormatException catch (e) {
|
||||||
|
DebugLogger.warning(
|
||||||
|
'Failed to parse SSE JSON: $dataLine',
|
||||||
|
data: {'error': e.toString()},
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
DebugLogger.error(
|
||||||
|
'sse-message-parse-error',
|
||||||
|
scope: 'streaming/sse',
|
||||||
|
error: e,
|
||||||
|
data: {'message': message},
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Split large content into smaller chunks for smoother streaming.
|
||||||
|
/// This matches the web client's streamLargeDeltasAsRandomChunks behavior.
|
||||||
|
static Stream<String> _splitIntoChunks(String content) async* {
|
||||||
|
var remaining = content;
|
||||||
|
|
||||||
|
while (remaining.isNotEmpty) {
|
||||||
|
// Random chunk size between 1-3 characters
|
||||||
|
final chunkSize = (remaining.length < 3)
|
||||||
|
? remaining.length
|
||||||
|
: 1 + (DateTime.now().millisecond % 3);
|
||||||
|
|
||||||
|
final chunk = remaining.substring(0, chunkSize);
|
||||||
|
yield chunk;
|
||||||
|
|
||||||
|
// Small delay for smoother visual effect (matching web client)
|
||||||
|
await Future.delayed(const Duration(milliseconds: 5));
|
||||||
|
|
||||||
|
remaining = remaining.substring(chunkSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1414,6 +1414,7 @@ Future<void> regenerateMessage(
|
|||||||
toolServers: toolServers,
|
toolServers: toolServers,
|
||||||
backgroundTasks: bgTasks,
|
backgroundTasks: bgTasks,
|
||||||
responseMessageId: assistantMessageId,
|
responseMessageId: assistantMessageId,
|
||||||
|
userSettings: userSettingsData,
|
||||||
);
|
);
|
||||||
|
|
||||||
final stream = response.stream;
|
final stream = response.stream;
|
||||||
@@ -1959,6 +1960,7 @@ Future<void> _sendMessageInternal(
|
|||||||
toolServers: toolServers,
|
toolServers: toolServers,
|
||||||
backgroundTasks: bgTasks,
|
backgroundTasks: bgTasks,
|
||||||
responseMessageId: assistantMessageId,
|
responseMessageId: assistantMessageId,
|
||||||
|
userSettings: userSettingsData,
|
||||||
);
|
);
|
||||||
|
|
||||||
final stream = response.stream;
|
final stream = response.stream;
|
||||||
|
|||||||
Reference in New Issue
Block a user