fix: tool calls streaming

This commit is contained in:
cogwheel0
2025-09-01 17:41:55 +05:30
parent 754ad6d444
commit 2c263cf866

View File

@@ -1069,16 +1069,13 @@ Future<void> _sendMessageInternal(
ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage); ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage);
// If socket is available, start listening for chat-events immediately // If socket is available, start listening for chat-events immediately
// For background-tools flow (when socket session is present), socket is the primary stream. // Background-tools flow (tools/tool servers) relies on socket/dynamic channel for
// In that case, do NOT suppress socket content. // streaming content. Allow socket TEXT in that mode. For pure SSE flows, suppress
// Suppress socket TEXT content when we already have a stream (SSE or polling) // socket TEXT to avoid duplicates (still surface tool_call status).
// but DO allow tool_call status via socket to surface tiles immediately. final bool isBackgroundToolsFlow =
// By default we already have an SSE/polling stream for content, (toolIdsForApi != null && toolIdsForApi.isNotEmpty) ||
// so suppress socket TEXT chunks to avoid duplicates. We'll still (toolServers != null && toolServers.isNotEmpty);
// surface tool_calls status via socket immediately. If the server bool suppressSocketContent = !isBackgroundToolsFlow; // allow socket text for tools
// switches us to a dynamic channel (request:chat:completion), we
// keep suppressing chat-events text but stream from that channel.
bool suppressSocketContent = true; // text-only suppression by default
bool usingDynamicChannel = false; // set true when server provides a channel bool usingDynamicChannel = false; // set true when server provides a channel
if (socketService != null) { if (socketService != null) {
void chatHandler(Map<String, dynamic> ev) { void chatHandler(Map<String, dynamic> ev) {
@@ -1288,6 +1285,7 @@ Future<void> _sendMessageInternal(
// Prefer dynamic channel for streaming content; suppress chat-events text to avoid duplicates // Prefer dynamic channel for streaming content; suppress chat-events text to avoid duplicates
suppressSocketContent = true; suppressSocketContent = true;
usingDynamicChannel = true; usingDynamicChannel = true;
usingDynamicChannel = true;
if (kSocketVerboseLogging) { if (kSocketVerboseLogging) {
DebugLogger.stream('Socket request:chat:completion channel=$channel'); DebugLogger.stream('Socket request:chat:completion channel=$channel');
} }
@@ -1725,10 +1723,9 @@ Future<void> _sendMessageInternal(
} }
// Allow socket content again for future sessions (harmless if already false) // Allow socket content again for future sessions (harmless if already false)
suppressSocketContent = false; suppressSocketContent = false;
// If this path was SSE-driven (no background socket), finish now. // If this path was SSE-driven (no background tools/dynamic channel), finish now.
// Otherwise keep streaming state until socket/dynamic channel signals done. // Otherwise keep streaming state until socket/dynamic channel signals done.
// We can safely finish on SSE completion when not using a dynamic channel. if (!usingDynamicChannel && !isBackgroundToolsFlow) {
if (!usingDynamicChannel) {
ref.read(chatMessagesProvider.notifier).finishStreaming(); ref.read(chatMessagesProvider.notifier).finishStreaming();
} }
@@ -1769,29 +1766,31 @@ Future<void> _sendMessageInternal(
formattedMessages.add(messageMap); formattedMessages.add(messageMap);
} }
// Send chat completed notification to OpenWebUI first // Only notify completion immediately for non-background SSE flows.
// Fire-and-forget with a short timeout; non-critical endpoint // For background tools/dynamic-channel flows, defer completion
try { // until the socket/dynamic channel signals done.
unawaited( if (!isBackgroundToolsFlow && !usingDynamicChannel) {
api try {
.sendChatCompleted( unawaited(
chatId: activeConversation.id, api
messageId: .sendChatCompleted(
assistantMessageId, // Use message ID from response chatId: activeConversation.id,
messages: formattedMessages, messageId:
model: selectedModel.id, assistantMessageId, // Use message ID from response
modelItem: modelItem, // Include model metadata messages: formattedMessages,
sessionId: sessionId, // Include session ID model: selectedModel.id,
) modelItem: modelItem, // Include model metadata
.timeout(const Duration(seconds: 3)) sessionId: sessionId, // Include session ID
.catchError((_) {}), )
); .timeout(const Duration(seconds: 3))
} catch (_) { .catchError((_) {}),
// Ignore );
} catch (_) {
// Ignore
}
} }
// Fetch the latest conversation state // Fetch the latest conversation state
try { try {
// Quick fetch to get the current state - no waiting for title generation // Quick fetch to get the current state - no waiting for title generation
final updatedConv = await api.getConversation( final updatedConv = await api.getConversation(