From 1ce981937df334b529919f3d13c6048d22a3a0fd Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Tue, 2 Sep 2025 00:13:30 +0530 Subject: [PATCH] refactor: sse cleanup --- lib/core/services/api_service.dart | 763 ++---------------- lib/core/services/sse_parser.dart | 385 --------- .../services/stream_recovery_service.dart | 237 ------ 3 files changed, 73 insertions(+), 1312 deletions(-) delete mode 100644 lib/core/services/sse_parser.dart delete mode 100644 lib/core/services/stream_recovery_service.dart diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 014f51e..441571b 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -629,10 +629,11 @@ class ApiService { final toolCalls = (msgData['tool_calls'] is List) ? (msgData['tool_calls'] as List) : (historyMsg != null && historyMsg['tool_calls'] is List) - ? (historyMsg['tool_calls'] as List) - : null; + ? (historyMsg['tool_calls'] as List) + : null; - if ((msgData['role']?.toString() == 'assistant') && toolCalls is List) { + if ((msgData['role']?.toString() == 'assistant') && + toolCalls is List) { // Collect subsequent tool results associated with this assistant turn final List> results = []; int j = idx + 1; @@ -674,7 +675,10 @@ class ApiService { } // Default path: parse message as-is - final message = _parseOpenWebUIMessage(msgData, historyMsg: historyMsg); + final message = _parseOpenWebUIMessage( + msgData, + historyMsg: historyMsg, + ); messages.add(message); debugPrint( 'DEBUG: Successfully parsed message: ${message.id} - ${message.role}', @@ -715,7 +719,8 @@ class ApiService { // Prefer richer content from history entry if present dynamic content = msgData['content']; if ((content == null || (content is String && content.isEmpty)) && - historyMsg != null && historyMsg['content'] != null) { + historyMsg != null && + historyMsg['content'] != null) { content = historyMsg['content']; } String contentString; @@ -741,8 +746,8 @@ class ApiService { final toolCallsList = (msgData['tool_calls'] is List) ? (msgData['tool_calls'] as List) : (historyMsg != null && historyMsg['tool_calls'] is List) - ? (historyMsg['tool_calls'] as List) - : null; + ? (historyMsg['tool_calls'] as List) + : null; if (contentString.trim().isEmpty && toolCallsList is List) { final synthesized = _synthesizeToolDetailsFromToolCalls(toolCallsList); if (synthesized.isNotEmpty) { @@ -824,11 +829,15 @@ class ApiService { for (final c in toolCalls) { if (c is! Map) continue; final func = c['function'] as Map?; - final name = (func != null ? func['name'] : c['name'])?.toString() ?? 'tool'; - final id = (c['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}'); + final name = + (func != null ? func['name'] : c['name'])?.toString() ?? 'tool'; + final id = + (c['id']?.toString() ?? + 'call_${DateTime.now().millisecondsSinceEpoch}'); final done = (c['done']?.toString() ?? 'true'); final argsRaw = func != null ? func['arguments'] : c['arguments']; - final resRaw = c['result'] ?? c['output'] ?? (func != null ? func['result'] : null); + final resRaw = + c['result'] ?? c['output'] ?? (func != null ? func['result'] : null); final argsStr = _jsonStringify(argsRaw); final resStr = resRaw != null ? _jsonStringify(resRaw) : null; final attrs = StringBuffer() @@ -840,7 +849,9 @@ class ApiService { if (resStr != null && resStr.isNotEmpty) { attrs.write(' result="${_escapeHtmlAttr(resStr)}"'); } - buf.writeln('
Tool Executed'); + buf.writeln( + '
Tool Executed', + ); buf.writeln('
'); } return buf.toString().trim(); @@ -860,8 +871,11 @@ class ApiService { for (final c in toolCalls) { if (c is! Map) continue; final func = c['function'] as Map?; - final name = (func != null ? func['name'] : c['name'])?.toString() ?? 'tool'; - final id = (c['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}'); + final name = + (func != null ? func['name'] : c['name'])?.toString() ?? 'tool'; + final id = + (c['id']?.toString() ?? + 'call_${DateTime.now().millisecondsSinceEpoch}'); final argsRaw = func != null ? func['arguments'] : c['arguments']; final argsStr = _jsonStringify(argsRaw); final resultEntry = resultsMap[id]; @@ -872,7 +886,9 @@ class ApiService { final attrs = StringBuffer() ..write('type="tool_calls"') - ..write(' done="${_escapeHtmlAttr(resultEntry != null ? 'true' : 'false')}"') + ..write( + ' done="${_escapeHtmlAttr(resultEntry != null ? 'true' : 'false')}"', + ) ..write(' id="${_escapeHtmlAttr(id)}"') ..write(' name="${_escapeHtmlAttr(name)}"') ..write(' arguments="${_escapeHtmlAttr(argsStr)}"'); @@ -883,7 +899,9 @@ class ApiService { attrs.write(' files="${_escapeHtmlAttr(filesStr)}"'); } - buf.writeln('
${resultEntry != null ? 'Tool Executed' : 'Executing...'}'); + buf.writeln( + '
${resultEntry != null ? 'Tool Executed' : 'Executing...'}', + ); buf.writeln('
'); } return buf.toString().trim(); @@ -897,14 +915,19 @@ class ApiService { if (type == null) continue; // OpenWebUI content-blocks shape: { type: 'tool_calls', content: [...], results: [...] } if (type == 'tool_calls') { - final calls = (item['content'] is List) ? (item['content'] as List) : []; + final calls = (item['content'] is List) + ? (item['content'] as List) + : []; final results = >[]; if (item['results'] is List) { for (final r in (item['results'] as List)) { if (r is Map) results.add(r); } } - final synthesized = _synthesizeToolDetailsFromToolCallsWithResults(calls, results); + final synthesized = _synthesizeToolDetailsFromToolCallsWithResults( + calls, + results, + ); if (synthesized.isNotEmpty) buf.writeln(synthesized); continue; } @@ -912,12 +935,16 @@ class ApiService { // Heuristics: handle other variants (single tool/function call entries) if (type == 'tool_call' || type == 'function_call') { final name = (item['name'] ?? item['tool'] ?? 'tool').toString(); - final id = (item['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}'); + final id = + (item['id']?.toString() ?? + 'call_${DateTime.now().millisecondsSinceEpoch}'); final argsStr = _jsonStringify(item['arguments'] ?? item['args']); final resStr = item['result'] ?? item['output'] ?? item['response']; final attrs = StringBuffer() ..write('type="tool_calls"') - ..write(' done="${_escapeHtmlAttr(resStr != null ? 'true' : 'false')}"') + ..write( + ' done="${_escapeHtmlAttr(resStr != null ? 'true' : 'false')}"', + ) ..write(' id="${_escapeHtmlAttr(id)}"') ..write(' name="${_escapeHtmlAttr(name)}"') ..write(' arguments="${_escapeHtmlAttr(argsStr)}"'); @@ -925,7 +952,9 @@ class ApiService { final r = _jsonStringify(resStr); if (r.isNotEmpty) attrs.write(' result="${_escapeHtmlAttr(r)}"'); } - buf.writeln('
${resStr != null ? 'Tool Executed' : 'Executing...'}'); + buf.writeln( + '
${resStr != null ? 'Tool Executed' : 'Executing...'}', + ); buf.writeln('
'); } } @@ -2589,7 +2618,8 @@ class ApiService { // Generate unique IDs final messageId = const Uuid().v4(); - final sessionId = (sessionIdOverride != null && sessionIdOverride.isNotEmpty) + final sessionId = + (sessionIdOverride != null && sessionIdOverride.isNotEmpty) ? sessionIdOverride : const Uuid().v4().substring(0, 20); @@ -2679,7 +2709,8 @@ class ApiService { // It allows the client to display a collapsible "Thinking" section. data['params'] = { 'reasoning_tags': true, - 'reasoning_effort': 'medium', // Safe default; providers ignore if unsupported + 'reasoning_effort': + 'medium', // Safe default; providers ignore if unsupported }; // Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings) @@ -2690,7 +2721,8 @@ class ApiService { // Hint server to use native function calling when tools are selected // This enables provider-native tool execution paths and consistent UI events try { - final params = (data['params'] as Map? ) ?? {}; + final params = + (data['params'] as Map?) ?? {}; params['function_calling'] = 'native'; data['params'] = params; debugPrint('DEBUG: Set params.function_calling = native'); @@ -2702,7 +2734,9 @@ class ApiService { // Include tool_servers if provided (for native function calling with OpenAPI servers) if (toolServers != null && toolServers.isNotEmpty) { data['tool_servers'] = toolServers; - debugPrint('DEBUG: Including tool_servers in request (${toolServers.length})'); + debugPrint( + 'DEBUG: Including tool_servers in request (${toolServers.length})', + ); } // Include non-image files at the top level as expected by Open WebUI @@ -2761,7 +2795,9 @@ class ApiService { try { final resp = await _dio.post('/api/chat/completions', data: data); final respData = resp.data; - final taskId = (respData is Map) ? (respData['task_id']?.toString()) : null; + final taskId = (respData is Map) + ? (respData['task_id']?.toString()) + : null; debugPrint('DEBUG: Background task created: $taskId'); // If no session/socket provided, fall back to polling for updates. @@ -2838,8 +2874,9 @@ class ApiService { // Locate assistant content from multiple shapes String content = ''; - Map? chatObj = - (data['chat'] is Map) ? data['chat'] as Map : null; + Map? chatObj = (data['chat'] is Map) + ? data['chat'] as Map + : null; // 1) Preferred: chat.messages (list) – try exact id first if (chatObj != null && chatObj['messages'] is List) { @@ -2941,10 +2978,12 @@ class ApiService { if (content.isEmpty && chatObj != null) { final history = chatObj['history']; if (history is Map && history['messages'] is Map) { - final Map msgMapDyn = history['messages'] as Map; + final Map msgMapDyn = + history['messages'] as Map; // Iterate by values; no guaranteed ordering, but often sufficient for (final entry in msgMapDyn.values) { - if (entry is Map && (entry['role']?.toString() == 'assistant')) { + if (entry is Map && + (entry['role']?.toString() == 'assistant')) { final rawContent = entry['content']; if (rawContent is String) { content = rawContent; @@ -3024,665 +3063,6 @@ class ApiService { } } catch (_) {} } - // SSE helpers removed: background task flow is the only path now. - /* void _streamSSE( - Map data, - StreamController streamController, - String messageId, - ) async { - final persistentService = PersistentStreamingService(); - final recoveryService = StreamRecoveryService(); - final streamId = DateTime.now().millisecondsSinceEpoch.toString(); - // Create a cancel token for this SSE request and store it by message - final cancelToken = CancelToken(); - _streamCancelTokens[messageId] = cancelToken; - - // Extract metadata for recovery - final conversationId = data['conversation_id'] ?? data['chat_id'] ?? ''; - final sessionId = data['session_id'] ?? const Uuid().v4().substring(0, 20); - - // Register stream for recovery - recoveryService.registerStream( - streamId, - StreamRecoveryState( - baseUrl: serverConfig.url, - endpoint: '/api/chat/completions', - originalRequest: data, - headers: { - 'Authorization': 'Bearer ${_authInterceptor.authToken}', - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - }, - ), - ); - - // Recovery callback for persistent service - Future recoveryCallback() async { - debugPrint('Persistent: Attempting to recover stream $streamId'); - // Restart the streaming request - _streamSSE(data, streamController, messageId); - } - - // Declare variables that need to be accessible in catch block - String? persistentStreamId; - - try { - debugPrint( - 'DEBUG: Making SSE request with parser to /api/chat/completions', - ); - - // Create a fresh Dio instance optimized for SSE streaming - final streamDio = Dio( - BaseOptions( - baseUrl: serverConfig.url, - connectTimeout: const Duration( - seconds: 60, - ), // Longer for initial connection - receiveTimeout: null, // No timeout for streaming - sendTimeout: const Duration(seconds: 30), - headers: { - 'Authorization': 'Bearer ${_authInterceptor.authToken}', - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - ...serverConfig.customHeaders, // Include any custom headers - }, - validateStatus: (status) => status != null && status < 400, - followRedirects: true, - maxRedirects: 3, - ), - ); - - DebugLogger.log('Sending SSE request with data structure logged'); - - final response = await streamDio.post( - '/api/chat/completions', - data: data, // Pass data directly as Map - options: Options( - responseType: ResponseType.stream, - receiveTimeout: null, - ), - cancelToken: cancelToken, - ); - - debugPrint('DEBUG: SSE response status: ${response.statusCode}'); - debugPrint('DEBUG: SSE response headers: ${response.headers}'); - debugPrint( - 'DEBUG: SSE content-type: ${response.headers.value('content-type')}', - ); - - if (response.statusCode != 200) { - throw Exception( - 'HTTP ${response.statusCode}: Failed to start streaming', - ); - } - - // Check if we got SSE or JSON response - final contentType = response.headers.value('content-type') ?? ''; - if (!contentType.contains('text/event-stream')) { - debugPrint('WARNING: Expected SSE but got content-type: $contentType'); - debugPrint( - 'WARNING: This usually means the server didn\'t receive the streaming parameters', - ); - - // Try to read the response to see what we got - final stream = response.data.stream as Stream>; - final bytes = await stream.toList(); - final fullBytes = bytes.expand((x) => x).toList(); - final responseText = utf8.decode(fullBytes); - debugPrint('DEBUG: Non-SSE response length: ${responseText.length}'); - - // If it's JSON, parse and handle it - if (contentType.contains('application/json')) { - try { - final json = jsonDecode(responseText); - - // Check if it's an error - if (json is Map && json.containsKey('error')) { - debugPrint('ERROR: Server returned error: ${json['error']}'); - streamController.addError('Server error: ${json['error']}'); - return; - } - - // Try to extract content from non-streaming response - if (json is Map && json.containsKey('choices')) { - final choices = json['choices'] as List?; - if (choices != null && choices.isNotEmpty) { - final choice = choices[0] as Map; - if (choice.containsKey('message')) { - final message = choice['message'] as Map; - final content = message['content']?.toString() ?? ''; - if (content.isNotEmpty) { - debugPrint( - 'DEBUG: Successfully extracted content from JSON response', - ); - // Stream the content word by word for better UX - final words = content.split(' '); - for (final word in words) { - streamController.add('$word '); - await Future.delayed(const Duration(milliseconds: 20)); - } - } - } - } - } - - // Log what we got if we couldn't extract content - if (!streamController.isClosed) { - DebugLogger.log('JSON response structure: ${json.keys}'); - DebugLogger.log('JSON response received (full data suppressed)'); - - // Check if it's a task-based response - if (json is Map && json.containsKey('task_id')) { - debugPrint( - 'DEBUG: Got task-based response with task_id: ${json['task_id']}', - ); - debugPrint('DEBUG: Status: ${json['status']}'); - // This might be a polling-based async pattern - // TODO: Implement polling for task completion - } - } - } catch (e) { - debugPrint('ERROR: Failed to parse JSON response: $e'); - // Try to show something to the user - streamController.add( - 'Response received but could not be parsed properly.', - ); - } - } else { - // Not JSON, might be plain text - debugPrint('DEBUG: Got non-JSON response, treating as plain text'); - if (responseText.isNotEmpty && responseText.length < 10000) { - streamController.add(responseText); - } - } - - streamController.close(); - return; - } - - // Parse SSE stream using enhanced parser with heartbeat monitoring - final rawStream = response.data.stream; - - // Handle the stream properly based on its actual type - Stream> byteStream; - if (rawStream is Stream) { - byteStream = rawStream.map((uint8list) => uint8list.toList()); - } else { - byteStream = rawStream as Stream>; - } - - // Parse SSE events with enhanced parser (includes heartbeat monitoring) - final sseParser = SSEParser( - heartbeatTimeout: const Duration(seconds: 45), - ); - int contentIndex = 0; - int chunkSequence = 0; - String accumulatedContent = ''; - - // Monitor parser heartbeat for reconnection - sseParser.heartbeat.listen((_) { - debugPrint('Persistent: SSE heartbeat timeout detected'); - }); - - sseParser.reconnectRequests.listen((lastEventId) { - debugPrint( - 'Persistent: SSE reconnection requested, lastEventId: $lastEventId', - ); - // The persistent service will handle the reconnection - }); - - // Convert bytes to SSE events - final sseEventStream = SSEParser.parseStream( - byteStream, - heartbeatTimeout: const Duration(seconds: 45), - ); - - // Listen to the SSE event stream - final streamSubscription = sseEventStream.listen( - (event) { - try { - chunkSequence++; - - // Update parser with chunk data for heartbeat monitoring - sseParser.feed(''); // Reset heartbeat timer - - // Process the event data - if (persistentStreamId != null) { - _processSseEvent( - event, - streamController, - chunkSequence, - accumulatedContent, - persistentService, - persistentStreamId, - ); - } - - // Update recovery state - recoveryService.updateStreamProgress( - streamId, - event.data, - contentIndex++, - ); - } catch (e) { - debugPrint('Persistent: Error processing SSE event: $e'); - streamController.addError(e); - } - }, - onDone: () { - debugPrint('Persistent: SSE stream completed normally'); - if (persistentStreamId != null) { - persistentService.unregisterStream(persistentStreamId); - } - recoveryService.unregisterStream(streamId); - _streamCancelTokens.remove(messageId); - _messagePersistentStreamIds.remove(messageId); - if (!streamController.isClosed) { - streamController.close(); - } - }, - onError: (error) async { - debugPrint('Persistent: SSE stream error: $error'); - // If this was a user cancellation, close quietly - if (error is DioException && error.type == DioExceptionType.cancel) { - if (persistentStreamId != null) { - persistentService.unregisterStream(persistentStreamId); - } - recoveryService.unregisterStream(streamId); - _streamCancelTokens.remove(messageId); - _messagePersistentStreamIds.remove(messageId); - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - - // Try recovery through recovery service first - final recoveredStream = await recoveryService.recoverStream(streamId); - - if (recoveredStream != null) { - debugPrint('Persistent: Successfully recovered SSE stream'); - recoveredStream.listen( - (data) => streamController.add(data), - onDone: () { - if (persistentStreamId != null) { - persistentService.unregisterStream(persistentStreamId); - } - recoveryService.unregisterStream(streamId); - streamController.close(); - }, - onError: (e) { - if (persistentStreamId != null) { - persistentService.unregisterStream(persistentStreamId); - } - recoveryService.unregisterStream(streamId); - streamController.addError(e); - }, - ); - } else { - // Let persistent service handle recovery - debugPrint('Persistent: Delegating recovery to persistent service'); - if (persistentStreamId != null) { - persistentService.unregisterStream(persistentStreamId); - } - recoveryService.unregisterStream(streamId); - streamController.addError(error); - } - }, - cancelOnError: - false, // Continue processing despite individual event errors - ); - - // Register with persistent streaming service now that subscription is created - persistentStreamId = persistentService.registerStream( - subscription: streamSubscription, - controller: streamController, - recoveryCallback: recoveryCallback, - metadata: { - 'conversationId': conversationId, - 'messageId': messageId, - 'sessionId': sessionId, - 'lastChunkSequence': 0, - 'lastContent': '', - 'endpoint': '/api/chat/completions', - 'requestData': data, - }, - ); - // Track the persistent stream id by message for cancellation - _messagePersistentStreamIds[messageId] = persistentStreamId; - } catch (e) { - debugPrint('Persistent: Failed to create SSE stream: $e'); - if (persistentStreamId != null) { - persistentService.unregisterStream(persistentStreamId); - } - recoveryService.unregisterStream(streamId); - _streamCancelTokens.remove(messageId); - _messagePersistentStreamIds.remove(messageId); - - if (e is DioException && e.response?.statusCode == 401) { - // Auth error - don't retry - streamController.addError('Authentication failed'); - } else { - // Network or other error - trigger recovery - await recoveryCallback(); - } - } - } - - /// Process individual SSE events with content extraction and progress tracking - void _processSseEvent( - SSEEvent event, - StreamController streamController, - int chunkSequence, - String accumulatedContent, - PersistentStreamingService persistentService, - String persistentStreamId, - ) { - debugPrint( - 'Persistent: SSE event - type: ${event.event}, data: ${event.data}', - ); - - // Handle completion signal - if (event.data == '[DONE]') { - debugPrint('Persistent: SSE stream finished with [DONE]'); - // Ensure any open reasoning block is closed - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - - try { - final json = jsonDecode(event.data) as Map; - - // Handle errors - if (json.containsKey('error')) { - final error = json['error']; - debugPrint('Persistent: SSE error: $error'); - streamController.addError('Server error: $error'); - return; - } - - // Handle content streaming - if (json.containsKey('choices')) { - final choices = json['choices'] as List?; - if (choices != null && choices.isNotEmpty) { - final choice = choices[0] as Map; - - if (choice.containsKey('delta')) { - final delta = choice['delta'] as Map; - - // 1) Handle provider-native reasoning deltas (common keys) - final reasoning = delta['reasoning'] ?? delta['reasoning_content']; - if (reasoning is String && reasoning.isNotEmpty) { - // Open a reasoning block if not yet opened for this stream - _openReasoningBlockIfNeeded(streamController, persistentStreamId); - - if (!streamController.isClosed) { - streamController.add(reasoning); - } - - // We do NOT return here; model can send content alongside reasoning later - } - - // 1a) Surface tool call deltas as lightweight status updates - // Some providers stream tool_calls without content; show a hint so UI isn't stuck - if (delta.containsKey('tool_calls')) { - final tc = delta['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null; - if (name is String && name.isNotEmpty) { - final status = '\n
Executing...\n
\n'; - if (!streamController.isClosed) { - streamController.add(status); - } - } - } - } - } - } - - // Extract content - if (delta.containsKey('content')) { - final content = delta['content'] as String?; - if (content != null && content.isNotEmpty) { - debugPrint('Persistent: SSE content chunk: "$content"'); - - // Close any open reasoning block before normal content begins - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - - // Add content to stream - if (!streamController.isClosed) { - streamController.add(content); - } - - // Update persistent service progress - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - appendedContent: content, - ); - - accumulatedContent += content; - } - } - - // Check for completion in delta - if (delta.containsKey('finish_reason')) { - final finishReason = delta['finish_reason']; - debugPrint( - 'Persistent: Stream finished with reason: $finishReason', - ); - // Do NOT close on tool_calls; server will continue with tool execution updates - if (finishReason != 'tool_calls') { - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - } - } else if (choice.containsKey('finish_reason')) { - // Check for completion at choice level - final finishReason = choice['finish_reason']; - if (finishReason != null && finishReason != 'tool_calls') { - debugPrint( - 'Persistent: Stream finished with reason: $finishReason', - ); - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - if (!streamController.isClosed) { - streamController.close(); - } - return; - } - } - } - } - - // Handle streaming chat/completions format variations - if (json.containsKey('delta')) { - final delta = json['delta'] as Map; - if (delta.containsKey('content')) { - final content = delta['content'] as String?; - if (content != null && content.isNotEmpty) { - debugPrint('Persistent: Direct delta content: "$content"'); - - if (!streamController.isClosed) { - streamController.add(content); - } - - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - appendedContent: content, - ); - - accumulatedContent += content; - } - } - } - - // Handle OpenRouter-style streaming - if (json.containsKey('message')) { - final message = json['message'] as Map; - // Providers like Ollama may stream a separate thinking field - final thinking = message['thinking']; - if (thinking is String && thinking.isNotEmpty) { - _openReasoningBlockIfNeeded(streamController, persistentStreamId); - if (!streamController.isClosed) { - streamController.add(thinking); - } - } - if (message.containsKey('content')) { - final content = message['content'] as String?; - if (content != null && content.isNotEmpty) { - debugPrint('Persistent: Message content: "$content"'); - - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - - // Emit only the delta when server sends cumulative content - try { - final meta = - persistentService.getStreamMetadata(persistentStreamId); - final last = (meta != null && meta['lastContent'] is String) - ? (meta['lastContent'] as String) - : ''; - - String toEmit; - if (content.startsWith(last)) { - toEmit = content.substring(last.length); - } else { - // Fallback: emit suffix after longest common prefix - int i = 0; - final minLen = last.length < content.length - ? last.length - : content.length; - while (i < minLen && last.codeUnitAt(i) == content.codeUnitAt(i)) { - i++; - } - toEmit = content.substring(i); - } - - if (toEmit.isNotEmpty && !streamController.isClosed) { - streamController.add(toEmit); - } - - // Update persistent progress with the full content snapshot - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - content: content, - ); - } catch (_) { - // Best-effort fallback: append as-is - if (!streamController.isClosed) { - streamController.add(content); - } - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - content: content, - ); - } - } - } - } - - // Handle Open WebUI aggregated content blocks - // Server emits top-level { content: "...serialized blocks..." } updates - if (json.containsKey('content')) { - final contentVal = json['content']; - if (contentVal is String && contentVal.isNotEmpty) { - // Close reasoning section before appending rich content - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - - // Emit only the delta when server sends cumulative content - try { - final meta = - persistentService.getStreamMetadata(persistentStreamId); - final last = (meta != null && meta['lastContent'] is String) - ? (meta['lastContent'] as String) - : ''; - - String toEmit; - if (contentVal.startsWith(last)) { - toEmit = contentVal.substring(last.length); - } else { - // Fallback: emit suffix after longest common prefix - int i = 0; - final s = contentVal; - final minLen = last.length < s.length ? last.length : s.length; - while (i < minLen && last.codeUnitAt(i) == s.codeUnitAt(i)) { - i++; - } - toEmit = s.substring(i); - } - - if (toEmit.isNotEmpty && !streamController.isClosed) { - streamController.add(toEmit); - } - - // Update persistent progress with the full content snapshot - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - content: contentVal, - ); - } catch (_) { - // Best-effort fallback: append as-is - if (!streamController.isClosed) { - streamController.add(contentVal); - } - persistentService.updateStreamProgress( - persistentStreamId, - chunkSequence: chunkSequence, - content: contentVal, - ); - } - } - } - } catch (e) { - debugPrint('Persistent: Error parsing SSE event data: $e'); - // Don't fail the entire stream for one bad event - } - } - - // ===== Reasoning block helpers ===== - // Track open reasoning blocks by stream id - final Map _reasoningOpen = {}; - - void _openReasoningBlockIfNeeded( - StreamController streamController, - String persistentStreamId, - ) { - if (_reasoningOpen[persistentStreamId] == true) return; - _reasoningOpen[persistentStreamId] = true; - if (!streamController.isClosed) { - // Minimal details block (parser supports missing attrs) - streamController.add('
Thinking…\n'); - } - } - - void _closeReasoningBlockIfOpen( - StreamController streamController, - String persistentStreamId, - ) { - if (_reasoningOpen[persistentStreamId] == true) { - _reasoningOpen[persistentStreamId] = false; - if (!streamController.isClosed) { - streamController.add('\n
\n'); - } - } - } - - */ - // Legacy Socket.IO and older SSE methods removed // File upload for RAG Future uploadFile(String filePath, String fileName) async { @@ -4001,13 +3381,16 @@ class ApiService { queryParameters: qp, // Accept 404/405 to avoid throwing when endpoint is unsupported options: Options( - validateStatus: (code) => code != null && (code < 400 || code == 404 || code == 405), + validateStatus: (code) => + code != null && (code < 400 || code == 404 || code == 405), ), ); // If not supported, quietly return empty results if (response.statusCode == 404 || response.statusCode == 405) { - debugPrint('DEBUG: messages search endpoint not supported (status: ${response.statusCode})'); + debugPrint( + 'DEBUG: messages search endpoint not supported (status: ${response.statusCode})', + ); return []; } diff --git a/lib/core/services/sse_parser.dart b/lib/core/services/sse_parser.dart deleted file mode 100644 index fad916f..0000000 --- a/lib/core/services/sse_parser.dart +++ /dev/null @@ -1,385 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'package:flutter/foundation.dart'; - -/// Event data from Server-Sent Events stream -class SSEEvent { - final String? id; - final String? event; - final String data; - final int? retry; - - SSEEvent({ - this.id, - this.event, - required this.data, - this.retry, - }); -} - -/// Parser for Server-Sent Events with robust error handling and heartbeat support -class SSEParser { - final _controller = StreamController.broadcast(); - - String _buffer = ''; - String? _currentId; - String? _currentEvent; - String _currentData = ''; - int? _currentRetry; - - // Heartbeat and health monitoring - Timer? _heartbeatTimer; - DateTime _lastDataReceived = DateTime.now(); - Duration _heartbeatTimeout = const Duration(seconds: 30); - bool _isClosed = false; - - // Recovery state - String? _lastEventId; - bool _reconnectRequested = false; - - Stream get stream => _controller.stream; - - // Events for monitoring connection health - final _heartbeatController = StreamController.broadcast(); - final _reconnectController = StreamController.broadcast(); - - Stream get heartbeat => _heartbeatController.stream; - Stream get reconnectRequests => _reconnectController.stream; - - SSEParser({Duration? heartbeatTimeout}) { - if (heartbeatTimeout != null) { - _heartbeatTimeout = heartbeatTimeout; - } - _startHeartbeatTimer(); - } - - /// Feed raw text data to the parser - void feed(String chunk) { - if (_isClosed) return; - - _lastDataReceived = DateTime.now(); - _buffer += chunk; - _processBuffer(); - - // Reset heartbeat timer since we received data - _resetHeartbeatTimer(); - } - - void _startHeartbeatTimer() { - _heartbeatTimer?.cancel(); - _heartbeatTimer = Timer(_heartbeatTimeout, _onHeartbeatTimeout); - } - - void _resetHeartbeatTimer() { - if (!_isClosed) { - _startHeartbeatTimer(); - } - } - - void _onHeartbeatTimeout() { - debugPrint('SSEParser: Heartbeat timeout - no data received in ${_heartbeatTimeout.inSeconds}s'); - - if (!_isClosed) { - // Emit heartbeat timeout event - _heartbeatController.add(null); - - // Request reconnection with last event ID for recovery - _reconnectRequested = true; - _reconnectController.add(_lastEventId); - } - } - - /// Process buffered data and emit events - void _processBuffer() { - try { - // Handle potential Unicode boundary issues by checking for incomplete characters - if (_buffer.isNotEmpty && _hasIncompleteUnicode(_buffer)) { - // Keep buffer intact if it might contain incomplete Unicode - return; - } - - // Split by newlines but keep the last incomplete line - final lines = _buffer.split('\n'); - - // Keep the last line in buffer if it doesn't end with newline - if (!_buffer.endsWith('\n')) { - _buffer = lines.removeLast(); - } else { - _buffer = ''; - } - - for (final line in lines) { - _processLine(line); - } - } catch (e) { - debugPrint('SSEParser: Error processing buffer: $e'); - // Reset buffer on parsing error to prevent cascading failures - _buffer = ''; - } - } - - bool _hasIncompleteUnicode(String text) { - if (text.isEmpty) return false; - - // Check if the last few characters might be incomplete Unicode - // This is a simple heuristic - in practice, Dart's UTF-8 decoder handles this - final lastChar = text.codeUnitAt(text.length - 1); - - // If it's a high surrogate, we might be missing the low surrogate - return lastChar >= 0xD800 && lastChar <= 0xDBFF; - } - - /// Process a single line according to SSE spec - void _processLine(String line) { - // Handle carriage return if present (some servers use \r\n) - final cleanLine = line.replaceAll('\r', ''); - - // Empty line signals end of event - if (cleanLine.trim().isEmpty) { - if (_currentData.isNotEmpty) { - _emitEvent(); - } - _resetCurrentEvent(); - return; - } - - // Comment line (starts with :) - these serve as keep-alives - if (cleanLine.startsWith(':')) { - // Treat comments as heartbeat signals - _lastDataReceived = DateTime.now(); - _resetHeartbeatTimer(); - - // Log processing indicators but don't spam debug output - if (cleanLine.contains('OPENROUTER') && kDebugMode) { - debugPrint('SSEParser: OpenRouter processing...'); - } else if (cleanLine.contains('PROCESSING') && kDebugMode) { - debugPrint('SSEParser: Server processing...'); - } - return; - } - - // Parse field and value - final colonIndex = cleanLine.indexOf(':'); - String field; - String value; - - if (colonIndex == -1) { - field = cleanLine; - value = ''; - } else { - field = cleanLine.substring(0, colonIndex); - value = cleanLine.substring(colonIndex + 1); - // Remove leading space from value if present - if (value.startsWith(' ')) { - value = value.substring(1); - } - } - - // Process field according to SSE spec - switch (field) { - case 'data': - if (_currentData.isNotEmpty) { - _currentData += '\n'; - } - _currentData += value; - break; - - case 'event': - _currentEvent = value; - break; - - case 'id': - _currentId = value; - _lastEventId = value; // Track for reconnection - break; - - case 'retry': - final retryValue = int.tryParse(value); - if (retryValue != null) { - _currentRetry = retryValue; - } - break; - - default: - // Ignore unknown fields - break; - } - } - - /// Emit the current event - void _emitEvent() { - if (_isClosed) return; - - try { - final event = SSEEvent( - id: _currentId, - event: _currentEvent, - data: _currentData, - retry: _currentRetry, - ); - - _controller.add(event); - - // Track last event ID for potential reconnection - if (_currentId != null) { - _lastEventId = _currentId; - } - - } catch (e) { - debugPrint('SSEParser: Error emitting event: $e'); - _controller.addError(e); - } - } - - /// Reset current event state - void _resetCurrentEvent() { - _currentEvent = null; - _currentData = ''; - // Note: id and retry are not reset per SSE spec - } - - /// Close the parser - void close() { - if (_isClosed) return; - _isClosed = true; - - // Cancel heartbeat timer - _heartbeatTimer?.cancel(); - _heartbeatTimer = null; - - // Emit any remaining data - if (_currentData.isNotEmpty) { - _emitEvent(); - } - - // Close controllers - _controller.close(); - _heartbeatController.close(); - _reconnectController.close(); - } - - /// Get the last event ID for reconnection - String? get lastEventId => _lastEventId; - - /// Check if parser is closed - bool get isClosed => _isClosed; - - /// Check if reconnection was requested due to timeout - bool get reconnectRequested => _reconnectRequested; - - /// Reset reconnect flag (call when reconnection is handled) - void resetReconnectFlag() { - _reconnectRequested = false; - } - - /// Get time since last data was received - Duration get timeSinceLastData => DateTime.now().difference(_lastDataReceived); - - /// Parse SSE events from a stream of bytes with robust error handling - static Stream parseStream( - Stream> byteStream, { - Duration? heartbeatTimeout, - }) { - final parser = SSEParser(heartbeatTimeout: heartbeatTimeout); - - // Convert bytes to text and feed to parser with error recovery - StreamSubscription? subscription; - - subscription = byteStream - .transform(utf8.decoder) - .listen( - (chunk) { - try { - parser.feed(chunk); - } catch (e) { - debugPrint('SSEParser: Error feeding chunk: $e'); - // Don't propagate feed errors - just skip the problematic chunk - } - }, - onDone: () => parser.close(), - onError: (error) { - debugPrint('SSEParser: Stream error: $error'); - parser._controller.addError(error); - }, - cancelOnError: false, // Continue processing despite errors - ); - - // Clean up subscription when parser is closed - parser._controller.onCancel = () { - subscription?.cancel(); - }; - - return parser.stream; - } -} - -/// Transform a text stream into SSE events with heartbeat monitoring -class SSETransformer extends StreamTransformerBase { - final Duration? heartbeatTimeout; - - const SSETransformer({this.heartbeatTimeout}); - - @override - Stream bind(Stream stream) { - final parser = SSEParser(heartbeatTimeout: heartbeatTimeout); - - StreamSubscription? subscription; - - subscription = stream.listen( - (chunk) { - try { - parser.feed(chunk); - } catch (e) { - debugPrint('SSETransformer: Error feeding chunk: $e'); - // Continue processing despite errors - } - }, - onDone: () => parser.close(), - onError: (error) { - debugPrint('SSETransformer: Stream error: $error'); - parser._controller.addError(error); - }, - cancelOnError: false, - ); - - // Clean up subscription when parser is closed - parser._controller.onCancel = () { - subscription?.cancel(); - }; - - return parser.stream; - } -} - -/// Enhanced SSE event with additional metadata for resilient streaming -class EnhancedSSEEvent extends SSEEvent { - final DateTime timestamp; - final int sequenceNumber; - final String? sessionId; - - EnhancedSSEEvent({ - required super.data, - super.id, - super.event, - super.retry, - required this.timestamp, - required this.sequenceNumber, - this.sessionId, - }); - - factory EnhancedSSEEvent.fromSSEEvent( - SSEEvent event, { - required int sequenceNumber, - String? sessionId, - }) { - return EnhancedSSEEvent( - data: event.data, - id: event.id, - event: event.event, - retry: event.retry, - timestamp: DateTime.now(), - sequenceNumber: sequenceNumber, - sessionId: sessionId, - ); - } -} \ No newline at end of file diff --git a/lib/core/services/stream_recovery_service.dart b/lib/core/services/stream_recovery_service.dart deleted file mode 100644 index b7ed33d..0000000 --- a/lib/core/services/stream_recovery_service.dart +++ /dev/null @@ -1,237 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'package:flutter/foundation.dart'; -import 'package:dio/dio.dart'; - -class StreamRecoveryService { - static const int maxRetries = 3; - static const Duration retryDelay = Duration(seconds: 2); - - // Recovery state for each stream - final Map _recoveryStates = {}; - - // Register a stream for recovery - void registerStream(String streamId, StreamRecoveryState state) { - _recoveryStates[streamId] = state; - debugPrint('StreamRecoveryService: Registered stream $streamId for recovery'); - } - - // Unregister a stream - void unregisterStream(String streamId) { - _recoveryStates.remove(streamId); - debugPrint('StreamRecoveryService: Unregistered stream $streamId'); - } - - // Attempt to recover a stream - Future?> recoverStream(String streamId) async { - final state = _recoveryStates[streamId]; - if (state == null) { - debugPrint('StreamRecoveryService: No recovery state for stream $streamId'); - return null; - } - - debugPrint('StreamRecoveryService: Attempting to recover stream $streamId'); - debugPrint('StreamRecoveryService: Last received index: ${state.lastReceivedIndex}'); - - int retryCount = 0; - while (retryCount < maxRetries) { - try { - // Create recovery request with continuation token - final recoveryData = { - ...state.originalRequest, - 'continue_from_index': state.lastReceivedIndex, - 'recovery_mode': true, - 'stream_id': streamId, - }; - - // Add any accumulated content to avoid duplication - if (state.accumulatedContent.isNotEmpty) { - recoveryData['accumulated_content'] = state.accumulatedContent; - } - - debugPrint('StreamRecoveryService: Recovery attempt ${retryCount + 1}/$maxRetries'); - - // Make recovery request - final dio = Dio(BaseOptions( - baseUrl: state.baseUrl, - connectTimeout: const Duration(seconds: 30), - receiveTimeout: null, // No timeout for streaming - headers: state.headers, - )); - - final response = await dio.post( - state.endpoint, - data: recoveryData, - options: Options( - headers: { - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache', - }, - responseType: ResponseType.stream, - ), - ); - - if (response.statusCode == 200) { - debugPrint('StreamRecoveryService: Successfully recovered stream $streamId'); - - // Create new stream from recovered response - final stream = _processRecoveredStream( - response.data.stream, - state, - streamId, - ); - - return stream; - } - } catch (e) { - debugPrint('StreamRecoveryService: Recovery attempt failed: $e'); - retryCount++; - - if (retryCount < maxRetries) { - await Future.delayed(retryDelay * retryCount); - } - } - } - - debugPrint('StreamRecoveryService: Failed to recover stream $streamId after $maxRetries attempts'); - return null; - } - - // Process recovered stream and filter out duplicates - Stream _processRecoveredStream( - Stream> rawStream, - StreamRecoveryState state, - String streamId, - ) { - final controller = StreamController(); - - String buffer = ''; - bool skipUntilNewContent = state.lastReceivedIndex > 0; - int currentIndex = 0; - - rawStream.listen( - (chunk) { - final text = utf8.decode(chunk, allowMalformed: true); - buffer += text; - - // Process complete SSE events - while (buffer.contains('\n')) { - final lineEnd = buffer.indexOf('\n'); - final line = buffer.substring(0, lineEnd).trim(); - buffer = buffer.substring(lineEnd + 1); - - if (line.startsWith('data: ')) { - final data = line.substring(6); - - if (data == '[DONE]') { - controller.close(); - return; - } - - // Parse JSON data - try { - final json = jsonDecode(data); - - // Check if we should skip this content (already received) - if (skipUntilNewContent) { - currentIndex++; - if (currentIndex <= state.lastReceivedIndex) { - debugPrint('StreamRecoveryService: Skipping duplicate content at index $currentIndex'); - continue; - } - skipUntilNewContent = false; - } - - // Extract content from JSON - if (json['choices'] != null && json['choices'].isNotEmpty) { - final delta = json['choices'][0]['delta']; - if (delta != null && delta['content'] != null) { - final content = delta['content'] as String; - - // Update recovery state - state.lastReceivedIndex = currentIndex; - state.accumulatedContent += content; - - // Emit recovered content - controller.add(content); - currentIndex++; - } - } - } catch (e) { - debugPrint('StreamRecoveryService: Error parsing recovered data: $e'); - } - } - } - }, - onDone: () { - debugPrint('StreamRecoveryService: Recovered stream completed'); - controller.close(); - unregisterStream(streamId); - }, - onError: (error) { - debugPrint('StreamRecoveryService: Recovered stream error: $error'); - controller.addError(error); - - // Attempt another recovery - Future.delayed(retryDelay, () async { - final recoveredStream = await recoverStream(streamId); - if (recoveredStream != null) { - recoveredStream.listen( - (data) => controller.add(data), - onDone: () => controller.close(), - onError: (e) => controller.addError(e), - ); - } else { - controller.close(); - } - }); - }, - ); - - return controller.stream; - } - - // Update recovery state with new content - void updateStreamProgress(String streamId, String content, int index) { - final state = _recoveryStates[streamId]; - if (state != null) { - state.lastReceivedIndex = index; - state.accumulatedContent += content; - } - } - - // Clear recovery state for a stream - void clearStreamState(String streamId) { - _recoveryStates.remove(streamId); - } -} - -// Recovery state for a stream -class StreamRecoveryState { - final String baseUrl; - final String endpoint; - final Map originalRequest; - final Map headers; - int lastReceivedIndex; - String accumulatedContent; - DateTime lastActivity; - - StreamRecoveryState({ - required this.baseUrl, - required this.endpoint, - required this.originalRequest, - required this.headers, - this.lastReceivedIndex = 0, - this.accumulatedContent = '', - }) : lastActivity = DateTime.now(); - - // Check if stream is stale (no activity for too long) - bool get isStale { - return DateTime.now().difference(lastActivity).inMinutes > 5; - } - - // Update activity timestamp - void updateActivity() { - lastActivity = DateTime.now(); - } -} \ No newline at end of file