refactor: sse cleanup

This commit is contained in:
cogwheel0
2025-09-02 00:13:30 +05:30
parent 66935d1b0f
commit 1ce981937d
3 changed files with 73 additions and 1312 deletions

View File

@@ -629,10 +629,11 @@ class ApiService {
final toolCalls = (msgData['tool_calls'] is List)
? (msgData['tool_calls'] as List)
: (historyMsg != null && historyMsg['tool_calls'] is List)
? (historyMsg['tool_calls'] as List)
: null;
? (historyMsg['tool_calls'] as List)
: null;
if ((msgData['role']?.toString() == 'assistant') && toolCalls is List) {
if ((msgData['role']?.toString() == 'assistant') &&
toolCalls is List) {
// Collect subsequent tool results associated with this assistant turn
final List<Map<String, dynamic>> results = [];
int j = idx + 1;
@@ -674,7 +675,10 @@ class ApiService {
}
// Default path: parse message as-is
final message = _parseOpenWebUIMessage(msgData, historyMsg: historyMsg);
final message = _parseOpenWebUIMessage(
msgData,
historyMsg: historyMsg,
);
messages.add(message);
debugPrint(
'DEBUG: Successfully parsed message: ${message.id} - ${message.role}',
@@ -715,7 +719,8 @@ class ApiService {
// Prefer richer content from history entry if present
dynamic content = msgData['content'];
if ((content == null || (content is String && content.isEmpty)) &&
historyMsg != null && historyMsg['content'] != null) {
historyMsg != null &&
historyMsg['content'] != null) {
content = historyMsg['content'];
}
String contentString;
@@ -741,8 +746,8 @@ class ApiService {
final toolCallsList = (msgData['tool_calls'] is List)
? (msgData['tool_calls'] as List)
: (historyMsg != null && historyMsg['tool_calls'] is List)
? (historyMsg['tool_calls'] as List)
: null;
? (historyMsg['tool_calls'] as List)
: null;
if (contentString.trim().isEmpty && toolCallsList is List) {
final synthesized = _synthesizeToolDetailsFromToolCalls(toolCallsList);
if (synthesized.isNotEmpty) {
@@ -824,11 +829,15 @@ class ApiService {
for (final c in toolCalls) {
if (c is! Map) continue;
final func = c['function'] as Map?;
final name = (func != null ? func['name'] : c['name'])?.toString() ?? 'tool';
final id = (c['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}');
final name =
(func != null ? func['name'] : c['name'])?.toString() ?? 'tool';
final id =
(c['id']?.toString() ??
'call_${DateTime.now().millisecondsSinceEpoch}');
final done = (c['done']?.toString() ?? 'true');
final argsRaw = func != null ? func['arguments'] : c['arguments'];
final resRaw = c['result'] ?? c['output'] ?? (func != null ? func['result'] : null);
final resRaw =
c['result'] ?? c['output'] ?? (func != null ? func['result'] : null);
final argsStr = _jsonStringify(argsRaw);
final resStr = resRaw != null ? _jsonStringify(resRaw) : null;
final attrs = StringBuffer()
@@ -840,7 +849,9 @@ class ApiService {
if (resStr != null && resStr.isNotEmpty) {
attrs.write(' result="${_escapeHtmlAttr(resStr)}"');
}
buf.writeln('<details ${attrs.toString()}><summary>Tool Executed</summary>');
buf.writeln(
'<details ${attrs.toString()}><summary>Tool Executed</summary>',
);
buf.writeln('</details>');
}
return buf.toString().trim();
@@ -860,8 +871,11 @@ class ApiService {
for (final c in toolCalls) {
if (c is! Map) continue;
final func = c['function'] as Map?;
final name = (func != null ? func['name'] : c['name'])?.toString() ?? 'tool';
final id = (c['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}');
final name =
(func != null ? func['name'] : c['name'])?.toString() ?? 'tool';
final id =
(c['id']?.toString() ??
'call_${DateTime.now().millisecondsSinceEpoch}');
final argsRaw = func != null ? func['arguments'] : c['arguments'];
final argsStr = _jsonStringify(argsRaw);
final resultEntry = resultsMap[id];
@@ -872,7 +886,9 @@ class ApiService {
final attrs = StringBuffer()
..write('type="tool_calls"')
..write(' done="${_escapeHtmlAttr(resultEntry != null ? 'true' : 'false')}"')
..write(
' done="${_escapeHtmlAttr(resultEntry != null ? 'true' : 'false')}"',
)
..write(' id="${_escapeHtmlAttr(id)}"')
..write(' name="${_escapeHtmlAttr(name)}"')
..write(' arguments="${_escapeHtmlAttr(argsStr)}"');
@@ -883,7 +899,9 @@ class ApiService {
attrs.write(' files="${_escapeHtmlAttr(filesStr)}"');
}
buf.writeln('<details ${attrs.toString()}><summary>${resultEntry != null ? 'Tool Executed' : 'Executing...'}</summary>');
buf.writeln(
'<details ${attrs.toString()}><summary>${resultEntry != null ? 'Tool Executed' : 'Executing...'}</summary>',
);
buf.writeln('</details>');
}
return buf.toString().trim();
@@ -897,14 +915,19 @@ class ApiService {
if (type == null) continue;
// OpenWebUI content-blocks shape: { type: 'tool_calls', content: [...], results: [...] }
if (type == 'tool_calls') {
final calls = (item['content'] is List) ? (item['content'] as List) : <dynamic>[];
final calls = (item['content'] is List)
? (item['content'] as List)
: <dynamic>[];
final results = <Map<String, dynamic>>[];
if (item['results'] is List) {
for (final r in (item['results'] as List)) {
if (r is Map<String, dynamic>) results.add(r);
}
}
final synthesized = _synthesizeToolDetailsFromToolCallsWithResults(calls, results);
final synthesized = _synthesizeToolDetailsFromToolCallsWithResults(
calls,
results,
);
if (synthesized.isNotEmpty) buf.writeln(synthesized);
continue;
}
@@ -912,12 +935,16 @@ class ApiService {
// Heuristics: handle other variants (single tool/function call entries)
if (type == 'tool_call' || type == 'function_call') {
final name = (item['name'] ?? item['tool'] ?? 'tool').toString();
final id = (item['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}');
final id =
(item['id']?.toString() ??
'call_${DateTime.now().millisecondsSinceEpoch}');
final argsStr = _jsonStringify(item['arguments'] ?? item['args']);
final resStr = item['result'] ?? item['output'] ?? item['response'];
final attrs = StringBuffer()
..write('type="tool_calls"')
..write(' done="${_escapeHtmlAttr(resStr != null ? 'true' : 'false')}"')
..write(
' done="${_escapeHtmlAttr(resStr != null ? 'true' : 'false')}"',
)
..write(' id="${_escapeHtmlAttr(id)}"')
..write(' name="${_escapeHtmlAttr(name)}"')
..write(' arguments="${_escapeHtmlAttr(argsStr)}"');
@@ -925,7 +952,9 @@ class ApiService {
final r = _jsonStringify(resStr);
if (r.isNotEmpty) attrs.write(' result="${_escapeHtmlAttr(r)}"');
}
buf.writeln('<details ${attrs.toString()}><summary>${resStr != null ? 'Tool Executed' : 'Executing...'}</summary>');
buf.writeln(
'<details ${attrs.toString()}><summary>${resStr != null ? 'Tool Executed' : 'Executing...'}</summary>',
);
buf.writeln('</details>');
}
}
@@ -2589,7 +2618,8 @@ class ApiService {
// Generate unique IDs
final messageId = const Uuid().v4();
final sessionId = (sessionIdOverride != null && sessionIdOverride.isNotEmpty)
final sessionId =
(sessionIdOverride != null && sessionIdOverride.isNotEmpty)
? sessionIdOverride
: const Uuid().v4().substring(0, 20);
@@ -2679,7 +2709,8 @@ class ApiService {
// It allows the client to display a collapsible "Thinking" section.
data['params'] = {
'reasoning_tags': true,
'reasoning_effort': 'medium', // Safe default; providers ignore if unsupported
'reasoning_effort':
'medium', // Safe default; providers ignore if unsupported
};
// Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings)
@@ -2690,7 +2721,8 @@ class ApiService {
// Hint server to use native function calling when tools are selected
// This enables provider-native tool execution paths and consistent UI events
try {
final params = (data['params'] as Map<String, dynamic>? ) ?? <String, dynamic>{};
final params =
(data['params'] as Map<String, dynamic>?) ?? <String, dynamic>{};
params['function_calling'] = 'native';
data['params'] = params;
debugPrint('DEBUG: Set params.function_calling = native');
@@ -2702,7 +2734,9 @@ class ApiService {
// Include tool_servers if provided (for native function calling with OpenAPI servers)
if (toolServers != null && toolServers.isNotEmpty) {
data['tool_servers'] = toolServers;
debugPrint('DEBUG: Including tool_servers in request (${toolServers.length})');
debugPrint(
'DEBUG: Including tool_servers in request (${toolServers.length})',
);
}
// Include non-image files at the top level as expected by Open WebUI
@@ -2761,7 +2795,9 @@ class ApiService {
try {
final resp = await _dio.post('/api/chat/completions', data: data);
final respData = resp.data;
final taskId = (respData is Map) ? (respData['task_id']?.toString()) : null;
final taskId = (respData is Map)
? (respData['task_id']?.toString())
: null;
debugPrint('DEBUG: Background task created: $taskId');
// If no session/socket provided, fall back to polling for updates.
@@ -2838,8 +2874,9 @@ class ApiService {
// Locate assistant content from multiple shapes
String content = '';
Map<String, dynamic>? chatObj =
(data['chat'] is Map<String, dynamic>) ? data['chat'] as Map<String, dynamic> : null;
Map<String, dynamic>? chatObj = (data['chat'] is Map<String, dynamic>)
? data['chat'] as Map<String, dynamic>
: null;
// 1) Preferred: chat.messages (list) try exact id first
if (chatObj != null && chatObj['messages'] is List) {
@@ -2941,10 +2978,12 @@ class ApiService {
if (content.isEmpty && chatObj != null) {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<dynamic, dynamic> msgMapDyn = history['messages'] as Map;
final Map<dynamic, dynamic> msgMapDyn =
history['messages'] as Map;
// Iterate by values; no guaranteed ordering, but often sufficient
for (final entry in msgMapDyn.values) {
if (entry is Map && (entry['role']?.toString() == 'assistant')) {
if (entry is Map &&
(entry['role']?.toString() == 'assistant')) {
final rawContent = entry['content'];
if (rawContent is String) {
content = rawContent;
@@ -3024,665 +3063,6 @@ class ApiService {
}
} catch (_) {}
}
// SSE helpers removed: background task flow is the only path now.
/* void _streamSSE(
Map<String, dynamic> data,
StreamController<String> streamController,
String messageId,
) async {
final persistentService = PersistentStreamingService();
final recoveryService = StreamRecoveryService();
final streamId = DateTime.now().millisecondsSinceEpoch.toString();
// Create a cancel token for this SSE request and store it by message
final cancelToken = CancelToken();
_streamCancelTokens[messageId] = cancelToken;
// Extract metadata for recovery
final conversationId = data['conversation_id'] ?? data['chat_id'] ?? '';
final sessionId = data['session_id'] ?? const Uuid().v4().substring(0, 20);
// Register stream for recovery
recoveryService.registerStream(
streamId,
StreamRecoveryState(
baseUrl: serverConfig.url,
endpoint: '/api/chat/completions',
originalRequest: data,
headers: {
'Authorization': 'Bearer ${_authInterceptor.authToken}',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
),
);
// Recovery callback for persistent service
Future<void> recoveryCallback() async {
debugPrint('Persistent: Attempting to recover stream $streamId');
// Restart the streaming request
_streamSSE(data, streamController, messageId);
}
// Declare variables that need to be accessible in catch block
String? persistentStreamId;
try {
debugPrint(
'DEBUG: Making SSE request with parser to /api/chat/completions',
);
// Create a fresh Dio instance optimized for SSE streaming
final streamDio = Dio(
BaseOptions(
baseUrl: serverConfig.url,
connectTimeout: const Duration(
seconds: 60,
), // Longer for initial connection
receiveTimeout: null, // No timeout for streaming
sendTimeout: const Duration(seconds: 30),
headers: {
'Authorization': 'Bearer ${_authInterceptor.authToken}',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
...serverConfig.customHeaders, // Include any custom headers
},
validateStatus: (status) => status != null && status < 400,
followRedirects: true,
maxRedirects: 3,
),
);
DebugLogger.log('Sending SSE request with data structure logged');
final response = await streamDio.post(
'/api/chat/completions',
data: data, // Pass data directly as Map
options: Options(
responseType: ResponseType.stream,
receiveTimeout: null,
),
cancelToken: cancelToken,
);
debugPrint('DEBUG: SSE response status: ${response.statusCode}');
debugPrint('DEBUG: SSE response headers: ${response.headers}');
debugPrint(
'DEBUG: SSE content-type: ${response.headers.value('content-type')}',
);
if (response.statusCode != 200) {
throw Exception(
'HTTP ${response.statusCode}: Failed to start streaming',
);
}
// Check if we got SSE or JSON response
final contentType = response.headers.value('content-type') ?? '';
if (!contentType.contains('text/event-stream')) {
debugPrint('WARNING: Expected SSE but got content-type: $contentType');
debugPrint(
'WARNING: This usually means the server didn\'t receive the streaming parameters',
);
// Try to read the response to see what we got
final stream = response.data.stream as Stream<List<int>>;
final bytes = await stream.toList();
final fullBytes = bytes.expand((x) => x).toList();
final responseText = utf8.decode(fullBytes);
debugPrint('DEBUG: Non-SSE response length: ${responseText.length}');
// If it's JSON, parse and handle it
if (contentType.contains('application/json')) {
try {
final json = jsonDecode(responseText);
// Check if it's an error
if (json is Map && json.containsKey('error')) {
debugPrint('ERROR: Server returned error: ${json['error']}');
streamController.addError('Server error: ${json['error']}');
return;
}
// Try to extract content from non-streaming response
if (json is Map && json.containsKey('choices')) {
final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('message')) {
final message = choice['message'] as Map<String, dynamic>;
final content = message['content']?.toString() ?? '';
if (content.isNotEmpty) {
debugPrint(
'DEBUG: Successfully extracted content from JSON response',
);
// Stream the content word by word for better UX
final words = content.split(' ');
for (final word in words) {
streamController.add('$word ');
await Future.delayed(const Duration(milliseconds: 20));
}
}
}
}
}
// Log what we got if we couldn't extract content
if (!streamController.isClosed) {
DebugLogger.log('JSON response structure: ${json.keys}');
DebugLogger.log('JSON response received (full data suppressed)');
// Check if it's a task-based response
if (json is Map && json.containsKey('task_id')) {
debugPrint(
'DEBUG: Got task-based response with task_id: ${json['task_id']}',
);
debugPrint('DEBUG: Status: ${json['status']}');
// This might be a polling-based async pattern
// TODO: Implement polling for task completion
}
}
} catch (e) {
debugPrint('ERROR: Failed to parse JSON response: $e');
// Try to show something to the user
streamController.add(
'Response received but could not be parsed properly.',
);
}
} else {
// Not JSON, might be plain text
debugPrint('DEBUG: Got non-JSON response, treating as plain text');
if (responseText.isNotEmpty && responseText.length < 10000) {
streamController.add(responseText);
}
}
streamController.close();
return;
}
// Parse SSE stream using enhanced parser with heartbeat monitoring
final rawStream = response.data.stream;
// Handle the stream properly based on its actual type
Stream<List<int>> byteStream;
if (rawStream is Stream<Uint8List>) {
byteStream = rawStream.map((uint8list) => uint8list.toList());
} else {
byteStream = rawStream as Stream<List<int>>;
}
// Parse SSE events with enhanced parser (includes heartbeat monitoring)
final sseParser = SSEParser(
heartbeatTimeout: const Duration(seconds: 45),
);
int contentIndex = 0;
int chunkSequence = 0;
String accumulatedContent = '';
// Monitor parser heartbeat for reconnection
sseParser.heartbeat.listen((_) {
debugPrint('Persistent: SSE heartbeat timeout detected');
});
sseParser.reconnectRequests.listen((lastEventId) {
debugPrint(
'Persistent: SSE reconnection requested, lastEventId: $lastEventId',
);
// The persistent service will handle the reconnection
});
// Convert bytes to SSE events
final sseEventStream = SSEParser.parseStream(
byteStream,
heartbeatTimeout: const Duration(seconds: 45),
);
// Listen to the SSE event stream
final streamSubscription = sseEventStream.listen(
(event) {
try {
chunkSequence++;
// Update parser with chunk data for heartbeat monitoring
sseParser.feed(''); // Reset heartbeat timer
// Process the event data
if (persistentStreamId != null) {
_processSseEvent(
event,
streamController,
chunkSequence,
accumulatedContent,
persistentService,
persistentStreamId,
);
}
// Update recovery state
recoveryService.updateStreamProgress(
streamId,
event.data,
contentIndex++,
);
} catch (e) {
debugPrint('Persistent: Error processing SSE event: $e');
streamController.addError(e);
}
},
onDone: () {
debugPrint('Persistent: SSE stream completed normally');
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
_streamCancelTokens.remove(messageId);
_messagePersistentStreamIds.remove(messageId);
if (!streamController.isClosed) {
streamController.close();
}
},
onError: (error) async {
debugPrint('Persistent: SSE stream error: $error');
// If this was a user cancellation, close quietly
if (error is DioException && error.type == DioExceptionType.cancel) {
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
_streamCancelTokens.remove(messageId);
_messagePersistentStreamIds.remove(messageId);
if (!streamController.isClosed) {
streamController.close();
}
return;
}
// Try recovery through recovery service first
final recoveredStream = await recoveryService.recoverStream(streamId);
if (recoveredStream != null) {
debugPrint('Persistent: Successfully recovered SSE stream');
recoveredStream.listen(
(data) => streamController.add(data),
onDone: () {
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
streamController.close();
},
onError: (e) {
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
streamController.addError(e);
},
);
} else {
// Let persistent service handle recovery
debugPrint('Persistent: Delegating recovery to persistent service');
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
streamController.addError(error);
}
},
cancelOnError:
false, // Continue processing despite individual event errors
);
// Register with persistent streaming service now that subscription is created
persistentStreamId = persistentService.registerStream(
subscription: streamSubscription,
controller: streamController,
recoveryCallback: recoveryCallback,
metadata: {
'conversationId': conversationId,
'messageId': messageId,
'sessionId': sessionId,
'lastChunkSequence': 0,
'lastContent': '',
'endpoint': '/api/chat/completions',
'requestData': data,
},
);
// Track the persistent stream id by message for cancellation
_messagePersistentStreamIds[messageId] = persistentStreamId;
} catch (e) {
debugPrint('Persistent: Failed to create SSE stream: $e');
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
_streamCancelTokens.remove(messageId);
_messagePersistentStreamIds.remove(messageId);
if (e is DioException && e.response?.statusCode == 401) {
// Auth error - don't retry
streamController.addError('Authentication failed');
} else {
// Network or other error - trigger recovery
await recoveryCallback();
}
}
}
/// Process individual SSE events with content extraction and progress tracking
void _processSseEvent(
SSEEvent event,
StreamController<String> streamController,
int chunkSequence,
String accumulatedContent,
PersistentStreamingService persistentService,
String persistentStreamId,
) {
debugPrint(
'Persistent: SSE event - type: ${event.event}, data: ${event.data}',
);
// Handle completion signal
if (event.data == '[DONE]') {
debugPrint('Persistent: SSE stream finished with [DONE]');
// Ensure any open reasoning block is closed
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.close();
}
return;
}
try {
final json = jsonDecode(event.data) as Map<String, dynamic>;
// Handle errors
if (json.containsKey('error')) {
final error = json['error'];
debugPrint('Persistent: SSE error: $error');
streamController.addError('Server error: $error');
return;
}
// Handle content streaming
if (json.containsKey('choices')) {
final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('delta')) {
final delta = choice['delta'] as Map<String, dynamic>;
// 1) Handle provider-native reasoning deltas (common keys)
final reasoning = delta['reasoning'] ?? delta['reasoning_content'];
if (reasoning is String && reasoning.isNotEmpty) {
// Open a reasoning block if not yet opened for this stream
_openReasoningBlockIfNeeded(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.add(reasoning);
}
// We do NOT return here; model can send content alongside reasoning later
}
// 1a) Surface tool call deltas as lightweight status updates
// Some providers stream tool_calls without content; show a hint so UI isn't stuck
if (delta.containsKey('tool_calls')) {
final tc = delta['tool_calls'];
if (tc is List) {
for (final call in tc) {
if (call is Map<String, dynamic>) {
final fn = call['function'];
final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null;
if (name is String && name.isNotEmpty) {
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
if (!streamController.isClosed) {
streamController.add(status);
}
}
}
}
}
}
// Extract content
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: SSE content chunk: "$content"');
// Close any open reasoning block before normal content begins
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
// Add content to stream
if (!streamController.isClosed) {
streamController.add(content);
}
// Update persistent service progress
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
appendedContent: content,
);
accumulatedContent += content;
}
}
// Check for completion in delta
if (delta.containsKey('finish_reason')) {
final finishReason = delta['finish_reason'];
debugPrint(
'Persistent: Stream finished with reason: $finishReason',
);
// Do NOT close on tool_calls; server will continue with tool execution updates
if (finishReason != 'tool_calls') {
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.close();
}
return;
}
}
} else if (choice.containsKey('finish_reason')) {
// Check for completion at choice level
final finishReason = choice['finish_reason'];
if (finishReason != null && finishReason != 'tool_calls') {
debugPrint(
'Persistent: Stream finished with reason: $finishReason',
);
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.close();
}
return;
}
}
}
}
// Handle streaming chat/completions format variations
if (json.containsKey('delta')) {
final delta = json['delta'] as Map<String, dynamic>;
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Direct delta content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
appendedContent: content,
);
accumulatedContent += content;
}
}
}
// Handle OpenRouter-style streaming
if (json.containsKey('message')) {
final message = json['message'] as Map<String, dynamic>;
// Providers like Ollama may stream a separate thinking field
final thinking = message['thinking'];
if (thinking is String && thinking.isNotEmpty) {
_openReasoningBlockIfNeeded(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.add(thinking);
}
}
if (message.containsKey('content')) {
final content = message['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Message content: "$content"');
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
// Emit only the delta when server sends cumulative content
try {
final meta =
persistentService.getStreamMetadata(persistentStreamId);
final last = (meta != null && meta['lastContent'] is String)
? (meta['lastContent'] as String)
: '';
String toEmit;
if (content.startsWith(last)) {
toEmit = content.substring(last.length);
} else {
// Fallback: emit suffix after longest common prefix
int i = 0;
final minLen = last.length < content.length
? last.length
: content.length;
while (i < minLen && last.codeUnitAt(i) == content.codeUnitAt(i)) {
i++;
}
toEmit = content.substring(i);
}
if (toEmit.isNotEmpty && !streamController.isClosed) {
streamController.add(toEmit);
}
// Update persistent progress with the full content snapshot
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content,
);
} catch (_) {
// Best-effort fallback: append as-is
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content,
);
}
}
}
}
// Handle Open WebUI aggregated content blocks
// Server emits top-level { content: "...serialized blocks..." } updates
if (json.containsKey('content')) {
final contentVal = json['content'];
if (contentVal is String && contentVal.isNotEmpty) {
// Close reasoning section before appending rich content
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
// Emit only the delta when server sends cumulative content
try {
final meta =
persistentService.getStreamMetadata(persistentStreamId);
final last = (meta != null && meta['lastContent'] is String)
? (meta['lastContent'] as String)
: '';
String toEmit;
if (contentVal.startsWith(last)) {
toEmit = contentVal.substring(last.length);
} else {
// Fallback: emit suffix after longest common prefix
int i = 0;
final s = contentVal;
final minLen = last.length < s.length ? last.length : s.length;
while (i < minLen && last.codeUnitAt(i) == s.codeUnitAt(i)) {
i++;
}
toEmit = s.substring(i);
}
if (toEmit.isNotEmpty && !streamController.isClosed) {
streamController.add(toEmit);
}
// Update persistent progress with the full content snapshot
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: contentVal,
);
} catch (_) {
// Best-effort fallback: append as-is
if (!streamController.isClosed) {
streamController.add(contentVal);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: contentVal,
);
}
}
}
} catch (e) {
debugPrint('Persistent: Error parsing SSE event data: $e');
// Don't fail the entire stream for one bad event
}
}
// ===== Reasoning block helpers =====
// Track open reasoning blocks by stream id
final Map<String, bool> _reasoningOpen = {};
void _openReasoningBlockIfNeeded(
StreamController<String> streamController,
String persistentStreamId,
) {
if (_reasoningOpen[persistentStreamId] == true) return;
_reasoningOpen[persistentStreamId] = true;
if (!streamController.isClosed) {
// Minimal details block (parser supports missing attrs)
streamController.add('<details type="reasoning"><summary>Thinking…</summary>\n');
}
}
void _closeReasoningBlockIfOpen(
StreamController<String> streamController,
String persistentStreamId,
) {
if (_reasoningOpen[persistentStreamId] == true) {
_reasoningOpen[persistentStreamId] = false;
if (!streamController.isClosed) {
streamController.add('\n</details>\n');
}
}
}
*/
// Legacy Socket.IO and older SSE methods removed
// File upload for RAG
Future<String> uploadFile(String filePath, String fileName) async {
@@ -4001,13 +3381,16 @@ class ApiService {
queryParameters: qp,
// Accept 404/405 to avoid throwing when endpoint is unsupported
options: Options(
validateStatus: (code) => code != null && (code < 400 || code == 404 || code == 405),
validateStatus: (code) =>
code != null && (code < 400 || code == 404 || code == 405),
),
);
// If not supported, quietly return empty results
if (response.statusCode == 404 || response.statusCode == 405) {
debugPrint('DEBUG: messages search endpoint not supported (status: ${response.statusCode})');
debugPrint(
'DEBUG: messages search endpoint not supported (status: ${response.statusCode})',
);
return [];
}

View File

@@ -1,385 +0,0 @@
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
/// Event data from Server-Sent Events stream
class SSEEvent {
final String? id;
final String? event;
final String data;
final int? retry;
SSEEvent({
this.id,
this.event,
required this.data,
this.retry,
});
}
/// Parser for Server-Sent Events with robust error handling and heartbeat support
class SSEParser {
final _controller = StreamController<SSEEvent>.broadcast();
String _buffer = '';
String? _currentId;
String? _currentEvent;
String _currentData = '';
int? _currentRetry;
// Heartbeat and health monitoring
Timer? _heartbeatTimer;
DateTime _lastDataReceived = DateTime.now();
Duration _heartbeatTimeout = const Duration(seconds: 30);
bool _isClosed = false;
// Recovery state
String? _lastEventId;
bool _reconnectRequested = false;
Stream<SSEEvent> get stream => _controller.stream;
// Events for monitoring connection health
final _heartbeatController = StreamController<void>.broadcast();
final _reconnectController = StreamController<String?>.broadcast();
Stream<void> get heartbeat => _heartbeatController.stream;
Stream<String?> get reconnectRequests => _reconnectController.stream;
SSEParser({Duration? heartbeatTimeout}) {
if (heartbeatTimeout != null) {
_heartbeatTimeout = heartbeatTimeout;
}
_startHeartbeatTimer();
}
/// Feed raw text data to the parser
void feed(String chunk) {
if (_isClosed) return;
_lastDataReceived = DateTime.now();
_buffer += chunk;
_processBuffer();
// Reset heartbeat timer since we received data
_resetHeartbeatTimer();
}
void _startHeartbeatTimer() {
_heartbeatTimer?.cancel();
_heartbeatTimer = Timer(_heartbeatTimeout, _onHeartbeatTimeout);
}
void _resetHeartbeatTimer() {
if (!_isClosed) {
_startHeartbeatTimer();
}
}
void _onHeartbeatTimeout() {
debugPrint('SSEParser: Heartbeat timeout - no data received in ${_heartbeatTimeout.inSeconds}s');
if (!_isClosed) {
// Emit heartbeat timeout event
_heartbeatController.add(null);
// Request reconnection with last event ID for recovery
_reconnectRequested = true;
_reconnectController.add(_lastEventId);
}
}
/// Process buffered data and emit events
void _processBuffer() {
try {
// Handle potential Unicode boundary issues by checking for incomplete characters
if (_buffer.isNotEmpty && _hasIncompleteUnicode(_buffer)) {
// Keep buffer intact if it might contain incomplete Unicode
return;
}
// Split by newlines but keep the last incomplete line
final lines = _buffer.split('\n');
// Keep the last line in buffer if it doesn't end with newline
if (!_buffer.endsWith('\n')) {
_buffer = lines.removeLast();
} else {
_buffer = '';
}
for (final line in lines) {
_processLine(line);
}
} catch (e) {
debugPrint('SSEParser: Error processing buffer: $e');
// Reset buffer on parsing error to prevent cascading failures
_buffer = '';
}
}
bool _hasIncompleteUnicode(String text) {
if (text.isEmpty) return false;
// Check if the last few characters might be incomplete Unicode
// This is a simple heuristic - in practice, Dart's UTF-8 decoder handles this
final lastChar = text.codeUnitAt(text.length - 1);
// If it's a high surrogate, we might be missing the low surrogate
return lastChar >= 0xD800 && lastChar <= 0xDBFF;
}
/// Process a single line according to SSE spec
void _processLine(String line) {
// Handle carriage return if present (some servers use \r\n)
final cleanLine = line.replaceAll('\r', '');
// Empty line signals end of event
if (cleanLine.trim().isEmpty) {
if (_currentData.isNotEmpty) {
_emitEvent();
}
_resetCurrentEvent();
return;
}
// Comment line (starts with :) - these serve as keep-alives
if (cleanLine.startsWith(':')) {
// Treat comments as heartbeat signals
_lastDataReceived = DateTime.now();
_resetHeartbeatTimer();
// Log processing indicators but don't spam debug output
if (cleanLine.contains('OPENROUTER') && kDebugMode) {
debugPrint('SSEParser: OpenRouter processing...');
} else if (cleanLine.contains('PROCESSING') && kDebugMode) {
debugPrint('SSEParser: Server processing...');
}
return;
}
// Parse field and value
final colonIndex = cleanLine.indexOf(':');
String field;
String value;
if (colonIndex == -1) {
field = cleanLine;
value = '';
} else {
field = cleanLine.substring(0, colonIndex);
value = cleanLine.substring(colonIndex + 1);
// Remove leading space from value if present
if (value.startsWith(' ')) {
value = value.substring(1);
}
}
// Process field according to SSE spec
switch (field) {
case 'data':
if (_currentData.isNotEmpty) {
_currentData += '\n';
}
_currentData += value;
break;
case 'event':
_currentEvent = value;
break;
case 'id':
_currentId = value;
_lastEventId = value; // Track for reconnection
break;
case 'retry':
final retryValue = int.tryParse(value);
if (retryValue != null) {
_currentRetry = retryValue;
}
break;
default:
// Ignore unknown fields
break;
}
}
/// Emit the current event
void _emitEvent() {
if (_isClosed) return;
try {
final event = SSEEvent(
id: _currentId,
event: _currentEvent,
data: _currentData,
retry: _currentRetry,
);
_controller.add(event);
// Track last event ID for potential reconnection
if (_currentId != null) {
_lastEventId = _currentId;
}
} catch (e) {
debugPrint('SSEParser: Error emitting event: $e');
_controller.addError(e);
}
}
/// Reset current event state
void _resetCurrentEvent() {
_currentEvent = null;
_currentData = '';
// Note: id and retry are not reset per SSE spec
}
/// Close the parser
void close() {
if (_isClosed) return;
_isClosed = true;
// Cancel heartbeat timer
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
// Emit any remaining data
if (_currentData.isNotEmpty) {
_emitEvent();
}
// Close controllers
_controller.close();
_heartbeatController.close();
_reconnectController.close();
}
/// Get the last event ID for reconnection
String? get lastEventId => _lastEventId;
/// Check if parser is closed
bool get isClosed => _isClosed;
/// Check if reconnection was requested due to timeout
bool get reconnectRequested => _reconnectRequested;
/// Reset reconnect flag (call when reconnection is handled)
void resetReconnectFlag() {
_reconnectRequested = false;
}
/// Get time since last data was received
Duration get timeSinceLastData => DateTime.now().difference(_lastDataReceived);
/// Parse SSE events from a stream of bytes with robust error handling
static Stream<SSEEvent> parseStream(
Stream<List<int>> byteStream, {
Duration? heartbeatTimeout,
}) {
final parser = SSEParser(heartbeatTimeout: heartbeatTimeout);
// Convert bytes to text and feed to parser with error recovery
StreamSubscription? subscription;
subscription = byteStream
.transform(utf8.decoder)
.listen(
(chunk) {
try {
parser.feed(chunk);
} catch (e) {
debugPrint('SSEParser: Error feeding chunk: $e');
// Don't propagate feed errors - just skip the problematic chunk
}
},
onDone: () => parser.close(),
onError: (error) {
debugPrint('SSEParser: Stream error: $error');
parser._controller.addError(error);
},
cancelOnError: false, // Continue processing despite errors
);
// Clean up subscription when parser is closed
parser._controller.onCancel = () {
subscription?.cancel();
};
return parser.stream;
}
}
/// Transform a text stream into SSE events with heartbeat monitoring
class SSETransformer extends StreamTransformerBase<String, SSEEvent> {
final Duration? heartbeatTimeout;
const SSETransformer({this.heartbeatTimeout});
@override
Stream<SSEEvent> bind(Stream<String> stream) {
final parser = SSEParser(heartbeatTimeout: heartbeatTimeout);
StreamSubscription? subscription;
subscription = stream.listen(
(chunk) {
try {
parser.feed(chunk);
} catch (e) {
debugPrint('SSETransformer: Error feeding chunk: $e');
// Continue processing despite errors
}
},
onDone: () => parser.close(),
onError: (error) {
debugPrint('SSETransformer: Stream error: $error');
parser._controller.addError(error);
},
cancelOnError: false,
);
// Clean up subscription when parser is closed
parser._controller.onCancel = () {
subscription?.cancel();
};
return parser.stream;
}
}
/// Enhanced SSE event with additional metadata for resilient streaming
class EnhancedSSEEvent extends SSEEvent {
final DateTime timestamp;
final int sequenceNumber;
final String? sessionId;
EnhancedSSEEvent({
required super.data,
super.id,
super.event,
super.retry,
required this.timestamp,
required this.sequenceNumber,
this.sessionId,
});
factory EnhancedSSEEvent.fromSSEEvent(
SSEEvent event, {
required int sequenceNumber,
String? sessionId,
}) {
return EnhancedSSEEvent(
data: event.data,
id: event.id,
event: event.event,
retry: event.retry,
timestamp: DateTime.now(),
sequenceNumber: sequenceNumber,
sessionId: sessionId,
);
}
}

View File

@@ -1,237 +0,0 @@
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:dio/dio.dart';
class StreamRecoveryService {
static const int maxRetries = 3;
static const Duration retryDelay = Duration(seconds: 2);
// Recovery state for each stream
final Map<String, StreamRecoveryState> _recoveryStates = {};
// Register a stream for recovery
void registerStream(String streamId, StreamRecoveryState state) {
_recoveryStates[streamId] = state;
debugPrint('StreamRecoveryService: Registered stream $streamId for recovery');
}
// Unregister a stream
void unregisterStream(String streamId) {
_recoveryStates.remove(streamId);
debugPrint('StreamRecoveryService: Unregistered stream $streamId');
}
// Attempt to recover a stream
Future<Stream<String>?> recoverStream(String streamId) async {
final state = _recoveryStates[streamId];
if (state == null) {
debugPrint('StreamRecoveryService: No recovery state for stream $streamId');
return null;
}
debugPrint('StreamRecoveryService: Attempting to recover stream $streamId');
debugPrint('StreamRecoveryService: Last received index: ${state.lastReceivedIndex}');
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// Create recovery request with continuation token
final recoveryData = {
...state.originalRequest,
'continue_from_index': state.lastReceivedIndex,
'recovery_mode': true,
'stream_id': streamId,
};
// Add any accumulated content to avoid duplication
if (state.accumulatedContent.isNotEmpty) {
recoveryData['accumulated_content'] = state.accumulatedContent;
}
debugPrint('StreamRecoveryService: Recovery attempt ${retryCount + 1}/$maxRetries');
// Make recovery request
final dio = Dio(BaseOptions(
baseUrl: state.baseUrl,
connectTimeout: const Duration(seconds: 30),
receiveTimeout: null, // No timeout for streaming
headers: state.headers,
));
final response = await dio.post(
state.endpoint,
data: recoveryData,
options: Options(
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
},
responseType: ResponseType.stream,
),
);
if (response.statusCode == 200) {
debugPrint('StreamRecoveryService: Successfully recovered stream $streamId');
// Create new stream from recovered response
final stream = _processRecoveredStream(
response.data.stream,
state,
streamId,
);
return stream;
}
} catch (e) {
debugPrint('StreamRecoveryService: Recovery attempt failed: $e');
retryCount++;
if (retryCount < maxRetries) {
await Future.delayed(retryDelay * retryCount);
}
}
}
debugPrint('StreamRecoveryService: Failed to recover stream $streamId after $maxRetries attempts');
return null;
}
// Process recovered stream and filter out duplicates
Stream<String> _processRecoveredStream(
Stream<List<int>> rawStream,
StreamRecoveryState state,
String streamId,
) {
final controller = StreamController<String>();
String buffer = '';
bool skipUntilNewContent = state.lastReceivedIndex > 0;
int currentIndex = 0;
rawStream.listen(
(chunk) {
final text = utf8.decode(chunk, allowMalformed: true);
buffer += text;
// Process complete SSE events
while (buffer.contains('\n')) {
final lineEnd = buffer.indexOf('\n');
final line = buffer.substring(0, lineEnd).trim();
buffer = buffer.substring(lineEnd + 1);
if (line.startsWith('data: ')) {
final data = line.substring(6);
if (data == '[DONE]') {
controller.close();
return;
}
// Parse JSON data
try {
final json = jsonDecode(data);
// Check if we should skip this content (already received)
if (skipUntilNewContent) {
currentIndex++;
if (currentIndex <= state.lastReceivedIndex) {
debugPrint('StreamRecoveryService: Skipping duplicate content at index $currentIndex');
continue;
}
skipUntilNewContent = false;
}
// Extract content from JSON
if (json['choices'] != null && json['choices'].isNotEmpty) {
final delta = json['choices'][0]['delta'];
if (delta != null && delta['content'] != null) {
final content = delta['content'] as String;
// Update recovery state
state.lastReceivedIndex = currentIndex;
state.accumulatedContent += content;
// Emit recovered content
controller.add(content);
currentIndex++;
}
}
} catch (e) {
debugPrint('StreamRecoveryService: Error parsing recovered data: $e');
}
}
}
},
onDone: () {
debugPrint('StreamRecoveryService: Recovered stream completed');
controller.close();
unregisterStream(streamId);
},
onError: (error) {
debugPrint('StreamRecoveryService: Recovered stream error: $error');
controller.addError(error);
// Attempt another recovery
Future.delayed(retryDelay, () async {
final recoveredStream = await recoverStream(streamId);
if (recoveredStream != null) {
recoveredStream.listen(
(data) => controller.add(data),
onDone: () => controller.close(),
onError: (e) => controller.addError(e),
);
} else {
controller.close();
}
});
},
);
return controller.stream;
}
// Update recovery state with new content
void updateStreamProgress(String streamId, String content, int index) {
final state = _recoveryStates[streamId];
if (state != null) {
state.lastReceivedIndex = index;
state.accumulatedContent += content;
}
}
// Clear recovery state for a stream
void clearStreamState(String streamId) {
_recoveryStates.remove(streamId);
}
}
// Recovery state for a stream
class StreamRecoveryState {
final String baseUrl;
final String endpoint;
final Map<String, dynamic> originalRequest;
final Map<String, String> headers;
int lastReceivedIndex;
String accumulatedContent;
DateTime lastActivity;
StreamRecoveryState({
required this.baseUrl,
required this.endpoint,
required this.originalRequest,
required this.headers,
this.lastReceivedIndex = 0,
this.accumulatedContent = '',
}) : lastActivity = DateTime.now();
// Check if stream is stale (no activity for too long)
bool get isStale {
return DateTime.now().difference(lastActivity).inMinutes > 5;
}
// Update activity timestamp
void updateActivity() {
lastActivity = DateTime.now();
}
}