refactor: use background only flows

This commit is contained in:
cogwheel0
2025-09-26 13:59:28 +05:30
parent 5f03610f35
commit 3c959c83bf
2 changed files with 95 additions and 211 deletions

View File

@@ -2796,13 +2796,14 @@ class ApiService {
final Map<String, CancelToken> _streamCancelTokens = {};
final Map<String, String> _messagePersistentStreamIds = {};
// Send message with SSE streaming
// Returns a record with (stream, messageId, sessionId)
// Send message using the background flow (socket push + polling fallback).
// Returns a record with (stream, messageId, sessionId, socketSessionId, isBackgroundFlow)
({
Stream<String> stream,
String messageId,
String sessionId,
String? socketSessionId,
bool isBackgroundFlow,
})
sendMessage({
required List<Map<String, dynamic>> messages,
@@ -2877,9 +2878,12 @@ class ApiService {
}
}
// Build request data - minimal params for SSE to work
// OpenWebUI server doesn't support SSE with session_id/id parameters
final data = {
final bool hasBackgroundTasksPayload =
backgroundTasks != null && backgroundTasks.isNotEmpty;
// Build request data. Always request streamed responses so the backend can
// forward deltas over WebSocket when running in background task mode.
final data = <String, dynamic>{
'stream': true,
'model': model,
'messages': processedMessages,
@@ -2893,12 +2897,12 @@ class ApiService {
// Add feature flags if enabled
if (enableWebSearch) {
data['web_search'] = true;
_traceApi('Web search enabled in SSE request');
_traceApi('Web search enabled in streaming request');
}
if (enableImageGeneration) {
// Mirror web_search behavior for image generation
data['image_generation'] = true;
_traceApi('Image generation enabled in SSE request');
_traceApi('Image generation enabled in streaming request');
}
if (enableWebSearch || enableImageGeneration) {
@@ -2911,14 +2915,6 @@ class ApiService {
};
}
if (backgroundTasks != null && backgroundTasks.isNotEmpty) {
data['background_tasks'] = backgroundTasks;
}
if (socketSessionId != null && socketSessionId.isNotEmpty) {
data['session_id'] = socketSessionId;
}
data['id'] = messageId;
// No default reasoning parameters included; providers handle thinking UIs natively.
@@ -2926,7 +2922,7 @@ class ApiService {
// Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings)
if (toolIds != null && toolIds.isNotEmpty) {
data['tool_ids'] = toolIds;
_traceApi('Including tool_ids in SSE request: $toolIds');
_traceApi('Including tool_ids in streaming request: $toolIds');
// Hint server to use native function calling when tools are selected
// This enables provider-native tool execution paths and consistent UI events
@@ -2953,35 +2949,25 @@ class ApiService {
_traceApi('Including non-image files in request: ${allFiles.length}');
}
// Don't add session_id or id - they break SSE streaming!
// The server falls back to task-based async when these are present
_traceApi('Starting SSE streaming request');
_traceApi('Preparing chat send request (backgroundFlow: true)');
_traceApi('Model: $model');
_traceApi('Message count: ${processedMessages.length}');
// Debug the data being sent
_traceApi('SSE request data keys (pre-bg): ${data.keys.toList()}');
_traceApi('Request data keys (pre-dispatch): ${data.keys.toList()}');
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
_traceApi('Has session_id: ${data.containsKey('session_id')}');
_traceApi('background_tasks value: ${data['background_tasks']}');
_traceApi('session_id value: ${data['session_id']}');
_traceApi('id value: ${data['id']}');
_traceApi(
'Has background_tasks (pre-bg): ${data.containsKey('background_tasks')}',
'Forcing background flow (hasBackgroundTasks: '
'$hasBackgroundTasksPayload, tools: ${toolIds?.isNotEmpty == true}, '
'webSearch: $enableWebSearch, imageGen: $enableImageGeneration, '
'sessionOverride: ${sessionIdOverride != null && sessionIdOverride.isNotEmpty})',
);
_traceApi('Has session_id (pre-bg): ${data.containsKey('session_id')}');
_traceApi('background_tasks value (pre-bg): ${data['background_tasks']}');
_traceApi('session_id value (pre-bg): ${data['session_id']}');
_traceApi('id value (pre-bg): ${data['id']}');
// Decide whether to use background task flow.
// Use background task mode when tools, web_search, image_generation are enabled,
// or when an explicit dynamic socket session binding is requested.
final bool useBackgroundTasks =
(toolIds != null && toolIds.isNotEmpty) ||
enableWebSearch ||
enableImageGeneration ||
(sessionIdOverride != null && sessionIdOverride.isNotEmpty);
// Use background flow only when required; otherwise prefer SSE even with chat_id.
// SSE must not include session_id/id to avoid server falling back to task mode.
if (useBackgroundTasks) {
// Attach identifiers to trigger background task processing on the server
data['session_id'] = sessionId;
data['id'] = messageId;
@@ -3001,12 +2987,10 @@ class ApiService {
_traceApi(
'Has tool_ids: ${data.containsKey('tool_ids')} -> ${data['tool_ids']}',
);
_traceApi(
'Has background_tasks: ${data.containsKey('background_tasks')}',
);
_traceApi('Has background_tasks: ${data.containsKey('background_tasks')}');
_traceApi('Initiating background tools flow (task-based)');
_traceApi('Posting to /api/chat/completions (no SSE)');
_traceApi('Posting to /api/chat/completions');
// Fire in background; poll chat for updates and stream deltas to UI
() async {
@@ -3019,9 +3003,16 @@ class ApiService {
_traceApi('Background task created: $taskId');
// If no session/socket provided, fall back to polling for updates.
if (sessionIdOverride == null || sessionIdOverride.isEmpty) {
final pollChatId = (conversationId != null && conversationId.isNotEmpty)
? conversationId
: null;
final requiresPolling =
sessionIdOverride == null || sessionIdOverride.isEmpty;
if (requiresPolling && pollChatId != null) {
final chatId = pollChatId;
await _pollChatForMessageUpdates(
chatId: conversationId!,
chatId: chatId,
messageId: messageId,
streamController: streamController,
);
@@ -3036,120 +3027,13 @@ class ApiService {
if (!streamController.isClosed) streamController.close();
}
}();
} else {
// SSE streaming path for low-latency pure completions (no background tasks)
() async {
try {
// Request SSE stream; avoid session_id/id which break streaming
final resp = await _dio.post(
'/api/chat/completions',
data: data,
options: Options(
responseType: ResponseType.stream,
headers: {'Accept': 'text/event-stream'},
),
);
// Parse SSE lines and forward deltas to the controller
final body = resp.data;
// Dio returns ResponseBody for stream responseType
final stream = (body is ResponseBody) ? body.stream : null;
if (stream == null) {
// Fallback: if server responded JSON, emit once
try {
final dataStr = resp.data?.toString() ?? '';
if (dataStr.isNotEmpty && !streamController.isClosed) {
streamController.add(dataStr);
}
} catch (_) {}
if (!streamController.isClosed) streamController.close();
return;
}
String buffer = '';
late final StreamSubscription<List<int>> sub;
sub = stream.listen(
(chunk) {
try {
buffer += utf8.decode(chunk, allowMalformed: true);
// Process complete lines; keep remainder in buffer
final parts = buffer.split('\n');
buffer = parts.isNotEmpty ? parts.removeLast() : '';
for (final raw in parts) {
final line = raw.trim();
if (line.isEmpty) continue;
if (line == 'data: [DONE]') {
try {
if (!streamController.isClosed) streamController.close();
} catch (_) {}
sub.cancel();
return;
}
if (line.startsWith('data:')) {
final payloadStr = line.substring(5).trim();
if (payloadStr.isEmpty) continue;
try {
final Map<String, dynamic> j = jsonDecode(payloadStr);
final choices = j['choices'];
if (choices is List && choices.isNotEmpty) {
final choice = choices.first;
final delta = choice is Map ? choice['delta'] : null;
if (delta is Map) {
final content = delta['content']?.toString() ?? '';
if (content.isNotEmpty &&
!streamController.isClosed) {
streamController.add(content);
}
} else {
final message = (choice is Map)
? (choice['message']?['content']?.toString() ??
'')
: '';
if (message.isNotEmpty &&
!streamController.isClosed) {
streamController.add(message);
}
}
} else if (j['content'] is String) {
final content = j['content'] as String;
if (content.isNotEmpty && !streamController.isClosed) {
streamController.add(content);
}
}
} catch (_) {
// Non-JSON payload; forward as-is
if (!streamController.isClosed) {
streamController.add(payloadStr);
}
}
}
}
} catch (_) {}
},
onDone: () {
try {
if (!streamController.isClosed) streamController.close();
} catch (_) {}
},
onError: (_) {
try {
if (!streamController.isClosed) streamController.close();
} catch (_) {}
},
cancelOnError: true,
);
} catch (e) {
_traceApi('SSE streaming failed: $e');
if (!streamController.isClosed) streamController.close();
}
}();
}
return (
stream: streamController.stream,
messageId: messageId,
sessionId: sessionId,
socketSessionId: socketSessionId,
isBackgroundFlow: true,
);
}

View File

@@ -1251,7 +1251,8 @@ Future<void> regenerateMessage(
isBackgroundToolsFlowPre ||
isBackgroundWebSearchPre ||
imageGenerationEnabled;
final bool passSocketSession = wantSessionBinding && isBackgroundFlowPre;
final bool passSocketSession =
wantSessionBinding && (isBackgroundFlowPre || bgTasks.isNotEmpty);
final response = api!.sendMessage(
messages: conversationMessages,
model: selectedModel.id,
@@ -1272,12 +1273,7 @@ Future<void> regenerateMessage(
final effectiveSessionId =
response.socketSessionId ?? socketSessionId ?? sessionId;
// New unified streaming path via helper; bypass old inline socket block
final bool isBackgroundFlow =
isBackgroundToolsFlowPre ||
isBackgroundWebSearchPre ||
imageGenerationEnabled ||
wantSessionBinding;
final bool isBackgroundFlow = response.isBackgroundFlow;
try {
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((
m,
@@ -1782,6 +1778,13 @@ Future<void> _sendMessageInternal(
(toolServers != null && toolServers.isNotEmpty);
final bool isBackgroundWebSearchPre = webSearchEnabled;
final bool shouldBindSession =
wantSessionBinding &&
(isBackgroundToolsFlowPre ||
isBackgroundWebSearchPre ||
imageGenerationEnabled ||
bgTasks.isNotEmpty);
final response = await api.sendMessage(
messages: conversationMessages,
model: selectedModel.id,
@@ -1793,7 +1796,7 @@ Future<void> _sendMessageInternal(
modelItem: modelItem,
// Bind to Socket session whenever available so the server can push
// streaming updates to this client (improves first-turn streaming).
sessionIdOverride: wantSessionBinding ? socketSessionId : null,
sessionIdOverride: shouldBindSession ? socketSessionId : null,
socketSessionId: socketSessionId,
toolServers: toolServers,
backgroundTasks: bgTasks,
@@ -1806,10 +1809,7 @@ Future<void> _sendMessageInternal(
response.socketSessionId ?? socketSessionId ?? sessionId;
// Use unified streaming helper for SSE/WebSocket handling
final bool isBackgroundFlow =
isBackgroundToolsFlowPre ||
isBackgroundWebSearchPre ||
wantSessionBinding;
final bool isBackgroundFlow = response.isBackgroundFlow;
try {
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((