From 3c959c83bfc929d65ccfa08f44f843a1580de336 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Fri, 26 Sep 2025 13:59:28 +0530 Subject: [PATCH] refactor: use background only flows --- lib/core/services/api_service.dart | 282 ++++++------------ .../chat/providers/chat_providers.dart | 24 +- 2 files changed, 95 insertions(+), 211 deletions(-) diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 63b879e..8814f0f 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -2796,13 +2796,14 @@ class ApiService { final Map _streamCancelTokens = {}; final Map _messagePersistentStreamIds = {}; - // Send message with SSE streaming - // Returns a record with (stream, messageId, sessionId) + // Send message using the background flow (socket push + polling fallback). + // Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow) ({ Stream stream, String messageId, String sessionId, String? socketSessionId, + bool isBackgroundFlow, }) sendMessage({ required List> messages, @@ -2877,9 +2878,12 @@ class ApiService { } } - // Build request data - minimal params for SSE to work - // OpenWebUI server doesn't support SSE with session_id/id parameters - final data = { + final bool hasBackgroundTasksPayload = + backgroundTasks != null && backgroundTasks.isNotEmpty; + + // Build request data. Always request streamed responses so the backend can + // forward deltas over WebSocket when running in background task mode. + final data = { 'stream': true, 'model': model, 'messages': processedMessages, @@ -2893,12 +2897,12 @@ class ApiService { // Add feature flags if enabled if (enableWebSearch) { data['web_search'] = true; - _traceApi('Web search enabled in SSE request'); + _traceApi('Web search enabled in streaming request'); } if (enableImageGeneration) { // Mirror web_search behavior for image generation data['image_generation'] = true; - _traceApi('Image generation enabled in SSE request'); + _traceApi('Image generation enabled in streaming request'); } if (enableWebSearch || enableImageGeneration) { @@ -2911,14 +2915,6 @@ class ApiService { }; } - if (backgroundTasks != null && backgroundTasks.isNotEmpty) { - data['background_tasks'] = backgroundTasks; - } - - if (socketSessionId != null && socketSessionId.isNotEmpty) { - data['session_id'] = socketSessionId; - } - data['id'] = messageId; // No default reasoning parameters included; providers handle thinking UIs natively. @@ -2926,7 +2922,7 @@ class ApiService { // Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings) if (toolIds != null && toolIds.isNotEmpty) { data['tool_ids'] = toolIds; - _traceApi('Including tool_ids in SSE request: $toolIds'); + _traceApi('Including tool_ids in streaming request: $toolIds'); // Hint server to use native function calling when tools are selected // This enables provider-native tool execution paths and consistent UI events @@ -2953,203 +2949,91 @@ class ApiService { _traceApi('Including non-image files in request: ${allFiles.length}'); } - // Don't add session_id or id - they break SSE streaming! - // The server falls back to task-based async when these are present - - _traceApi('Starting SSE streaming request'); + _traceApi('Preparing chat send request (backgroundFlow: true)'); _traceApi('Model: $model'); _traceApi('Message count: ${processedMessages.length}'); // Debug the data being sent - _traceApi('SSE request data keys (pre-bg): ${data.keys.toList()}'); + _traceApi('Request data keys (pre-dispatch): ${data.keys.toList()}'); + _traceApi('Has background_tasks: ${data.containsKey('background_tasks')}'); + _traceApi('Has session_id: ${data.containsKey('session_id')}'); + _traceApi('background_tasks value: ${data['background_tasks']}'); + _traceApi('session_id value: ${data['session_id']}'); + _traceApi('id value: ${data['id']}'); + _traceApi( - 'Has background_tasks (pre-bg): ${data.containsKey('background_tasks')}', + 'Forcing background flow (hasBackgroundTasks: ' + '$hasBackgroundTasksPayload, tools: ${toolIds?.isNotEmpty == true}, ' + 'webSearch: $enableWebSearch, imageGen: $enableImageGeneration, ' + 'sessionOverride: ${sessionIdOverride != null && sessionIdOverride.isNotEmpty})', ); - _traceApi('Has session_id (pre-bg): ${data.containsKey('session_id')}'); - _traceApi('background_tasks value (pre-bg): ${data['background_tasks']}'); - _traceApi('session_id value (pre-bg): ${data['session_id']}'); - _traceApi('id value (pre-bg): ${data['id']}'); - // Decide whether to use background task flow. - // Use background task mode when tools, web_search, image_generation are enabled, - // or when an explicit dynamic socket session binding is requested. - final bool useBackgroundTasks = - (toolIds != null && toolIds.isNotEmpty) || - enableWebSearch || - enableImageGeneration || - (sessionIdOverride != null && sessionIdOverride.isNotEmpty); - - // Use background flow only when required; otherwise prefer SSE even with chat_id. - // SSE must not include session_id/id to avoid server falling back to task mode. - if (useBackgroundTasks) { - // Attach identifiers to trigger background task processing on the server - data['session_id'] = sessionId; - data['id'] = messageId; - if (conversationId != null) { - data['chat_id'] = conversationId; - } - - // Attach background_tasks if provided - if (backgroundTasks != null && backgroundTasks.isNotEmpty) { - data['background_tasks'] = backgroundTasks; - } - - // Extra diagnostics to confirm dynamic-channel payload - _traceApi('Background flow payload keys: ${data.keys.toList()}'); - _traceApi('Using session_id: $sessionId'); - _traceApi('Using message id: $messageId'); - _traceApi( - 'Has tool_ids: ${data.containsKey('tool_ids')} -> ${data['tool_ids']}', - ); - _traceApi( - 'Has background_tasks: ${data.containsKey('background_tasks')}', - ); - - _traceApi('Initiating background tools flow (task-based)'); - _traceApi('Posting to /api/chat/completions (no SSE)'); - - // Fire in background; poll chat for updates and stream deltas to UI - () async { - try { - final resp = await _dio.post('/api/chat/completions', data: data); - final respData = resp.data; - final taskId = (respData is Map) - ? (respData['task_id']?.toString()) - : null; - _traceApi('Background task created: $taskId'); - - // If no session/socket provided, fall back to polling for updates. - if (sessionIdOverride == null || sessionIdOverride.isEmpty) { - await _pollChatForMessageUpdates( - chatId: conversationId!, - messageId: messageId, - streamController: streamController, - ); - } else { - // Close the controller so listeners don't hang waiting for chunks - if (!streamController.isClosed) { - streamController.close(); - } - } - } catch (e) { - _traceApi('Background tools flow failed: $e'); - if (!streamController.isClosed) streamController.close(); - } - }(); - } else { - // SSE streaming path for low-latency pure completions (no background tasks) - () async { - try { - // Request SSE stream; avoid session_id/id which break streaming - final resp = await _dio.post( - '/api/chat/completions', - data: data, - options: Options( - responseType: ResponseType.stream, - headers: {'Accept': 'text/event-stream'}, - ), - ); - - // Parse SSE lines and forward deltas to the controller - final body = resp.data; - // Dio returns ResponseBody for stream responseType - final stream = (body is ResponseBody) ? body.stream : null; - if (stream == null) { - // Fallback: if server responded JSON, emit once - try { - final dataStr = resp.data?.toString() ?? ''; - if (dataStr.isNotEmpty && !streamController.isClosed) { - streamController.add(dataStr); - } - } catch (_) {} - if (!streamController.isClosed) streamController.close(); - return; - } - - String buffer = ''; - late final StreamSubscription> sub; - sub = stream.listen( - (chunk) { - try { - buffer += utf8.decode(chunk, allowMalformed: true); - // Process complete lines; keep remainder in buffer - final parts = buffer.split('\n'); - buffer = parts.isNotEmpty ? parts.removeLast() : ''; - for (final raw in parts) { - final line = raw.trim(); - if (line.isEmpty) continue; - if (line == 'data: [DONE]') { - try { - if (!streamController.isClosed) streamController.close(); - } catch (_) {} - sub.cancel(); - return; - } - if (line.startsWith('data:')) { - final payloadStr = line.substring(5).trim(); - if (payloadStr.isEmpty) continue; - try { - final Map j = jsonDecode(payloadStr); - final choices = j['choices']; - if (choices is List && choices.isNotEmpty) { - final choice = choices.first; - final delta = choice is Map ? choice['delta'] : null; - if (delta is Map) { - final content = delta['content']?.toString() ?? ''; - if (content.isNotEmpty && - !streamController.isClosed) { - streamController.add(content); - } - } else { - final message = (choice is Map) - ? (choice['message']?['content']?.toString() ?? - '') - : ''; - if (message.isNotEmpty && - !streamController.isClosed) { - streamController.add(message); - } - } - } else if (j['content'] is String) { - final content = j['content'] as String; - if (content.isNotEmpty && !streamController.isClosed) { - streamController.add(content); - } - } - } catch (_) { - // Non-JSON payload; forward as-is - if (!streamController.isClosed) { - streamController.add(payloadStr); - } - } - } - } - } catch (_) {} - }, - onDone: () { - try { - if (!streamController.isClosed) streamController.close(); - } catch (_) {} - }, - onError: (_) { - try { - if (!streamController.isClosed) streamController.close(); - } catch (_) {} - }, - cancelOnError: true, - ); - } catch (e) { - _traceApi('SSE streaming failed: $e'); - if (!streamController.isClosed) streamController.close(); - } - }(); + // Attach identifiers to trigger background task processing on the server + data['session_id'] = sessionId; + data['id'] = messageId; + if (conversationId != null) { + data['chat_id'] = conversationId; } + // Attach background_tasks if provided + if (backgroundTasks != null && backgroundTasks.isNotEmpty) { + data['background_tasks'] = backgroundTasks; + } + + // Extra diagnostics to confirm dynamic-channel payload + _traceApi('Background flow payload keys: ${data.keys.toList()}'); + _traceApi('Using session_id: $sessionId'); + _traceApi('Using message id: $messageId'); + _traceApi( + 'Has tool_ids: ${data.containsKey('tool_ids')} -> ${data['tool_ids']}', + ); + _traceApi('Has background_tasks: ${data.containsKey('background_tasks')}'); + + _traceApi('Initiating background tools flow (task-based)'); + _traceApi('Posting to /api/chat/completions'); + + // Fire in background; poll chat for updates and stream deltas to UI + () async { + try { + final resp = await _dio.post('/api/chat/completions', data: data); + final respData = resp.data; + final taskId = (respData is Map) + ? (respData['task_id']?.toString()) + : null; + _traceApi('Background task created: $taskId'); + + // If no session/socket provided, fall back to polling for updates. + final pollChatId = (conversationId != null && conversationId.isNotEmpty) + ? conversationId + : null; + final requiresPolling = + sessionIdOverride == null || sessionIdOverride.isEmpty; + + if (requiresPolling && pollChatId != null) { + final chatId = pollChatId; + await _pollChatForMessageUpdates( + chatId: chatId, + messageId: messageId, + streamController: streamController, + ); + } else { + // Close the controller so listeners don't hang waiting for chunks + if (!streamController.isClosed) { + streamController.close(); + } + } + } catch (e) { + _traceApi('Background tools flow failed: $e'); + if (!streamController.isClosed) streamController.close(); + } + }(); + return ( stream: streamController.stream, messageId: messageId, sessionId: sessionId, socketSessionId: socketSessionId, + isBackgroundFlow: true, ); } diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 68ab9c8..6c418e4 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1251,7 +1251,8 @@ Future regenerateMessage( isBackgroundToolsFlowPre || isBackgroundWebSearchPre || imageGenerationEnabled; - final bool passSocketSession = wantSessionBinding && isBackgroundFlowPre; + final bool passSocketSession = + wantSessionBinding && (isBackgroundFlowPre || bgTasks.isNotEmpty); final response = api!.sendMessage( messages: conversationMessages, model: selectedModel.id, @@ -1272,12 +1273,7 @@ Future regenerateMessage( final effectiveSessionId = response.socketSessionId ?? socketSessionId ?? sessionId; - // New unified streaming path via helper; bypass old inline socket block - final bool isBackgroundFlow = - isBackgroundToolsFlowPre || - isBackgroundWebSearchPre || - imageGenerationEnabled || - wantSessionBinding; + final bool isBackgroundFlow = response.isBackgroundFlow; try { ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( m, @@ -1782,6 +1778,13 @@ Future _sendMessageInternal( (toolServers != null && toolServers.isNotEmpty); final bool isBackgroundWebSearchPre = webSearchEnabled; + final bool shouldBindSession = + wantSessionBinding && + (isBackgroundToolsFlowPre || + isBackgroundWebSearchPre || + imageGenerationEnabled || + bgTasks.isNotEmpty); + final response = await api.sendMessage( messages: conversationMessages, model: selectedModel.id, @@ -1793,7 +1796,7 @@ Future _sendMessageInternal( modelItem: modelItem, // Bind to Socket session whenever available so the server can push // streaming updates to this client (improves first-turn streaming). - sessionIdOverride: wantSessionBinding ? socketSessionId : null, + sessionIdOverride: shouldBindSession ? socketSessionId : null, socketSessionId: socketSessionId, toolServers: toolServers, backgroundTasks: bgTasks, @@ -1806,10 +1809,7 @@ Future _sendMessageInternal( response.socketSessionId ?? socketSessionId ?? sessionId; // Use unified streaming helper for SSE/WebSocket handling - final bool isBackgroundFlow = - isBackgroundToolsFlowPre || - isBackgroundWebSearchPre || - wantSessionBinding; + final bool isBackgroundFlow = response.isBackgroundFlow; try { ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((