fix: tool calling

This commit is contained in:
cogwheel0
2025-09-01 16:28:49 +05:30
parent 7daf331daf
commit d801fe9371
4 changed files with 369 additions and 67 deletions

View File

@@ -11,6 +11,7 @@ import '../../../core/providers/app_providers.dart';
import '../../../core/auth/auth_state_manager.dart';
import '../../../core/utils/stream_chunker.dart';
import '../../../core/services/persistent_streaming_service.dart';
import '../../../core/utils/debug_logger.dart';
import '../services/reviewer_mode_service.dart';
// Chat messages for current conversation
@@ -408,6 +409,10 @@ Future<void> regenerateMessage(
);
ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage);
// Reviewer mode: no immediate tool preview (no tool context)
// Reviewer mode: no immediate tool preview (no tool context)
// Use canned response for regeneration
final responseText = ReviewerModeService.generateResponse(
userMessage: userMessageContent,
@@ -1064,7 +1069,9 @@ Future<void> _sendMessageInternal(
// If socket is available, start listening for chat-events immediately
// For background-tools flow (when socket session is present), socket is the primary stream.
// In that case, do NOT suppress socket content.
bool suppressSocketContent = (socketSessionId == null); // only suppress when using SSE stream
// Suppress socket TEXT content when we already have a stream (SSE or polling)
// but DO allow tool_call status via socket to surface tiles immediately.
bool suppressSocketContent = (socketSessionId == null); // text-only suppression
if (socketService != null) {
void chatHandler(Map<String, dynamic> ev) {
try {
@@ -1072,10 +1079,12 @@ Future<void> _sendMessageInternal(
if (data == null) return;
final type = data['type'];
final payload = data['data'];
DebugLogger.stream('Socket chat-events: type=$type');
if (type == 'chat:completion' && payload != null) {
if (payload is Map<String, dynamic>) {
// Provider may emit tool_calls at the top level
if (!suppressSocketContent && payload.containsKey('tool_calls')) {
// Always surface tool_calls status from socket for instant tiles
if (payload.containsKey('tool_calls')) {
final tc = payload['tool_calls'];
if (tc is List) {
for (final call in tc) {
@@ -1146,19 +1155,218 @@ Future<void> _sendMessageInternal(
}
}
if (payload['done'] == true) {
// Do not force finish here to avoid cutting off active streams.
// Just stop listening to further socket events for this session.
// Stop listening to further socket events for this session.
try { socketService.offChatEvents(); } catch (_) {}
// If no content was rendered yet, fetch final assistant message from server
final msgs = ref.read(chatMessagesProvider);
if (msgs.isNotEmpty && msgs.last.role == 'assistant') {
final lastContent = msgs.last.content.trim();
if (lastContent.isEmpty) {
final apiSvc = ref.read(apiServiceProvider);
final chatId = activeConversation?.id;
final msgId = assistantMessageId;
if (apiSvc != null && chatId != null && chatId.isNotEmpty) {
Future.microtask(() async {
try {
final resp = await apiSvc.dio.get('/api/v1/chats/' + chatId);
final data = resp.data as Map<String, dynamic>;
String content = '';
final chatObj = data['chat'] as Map<String, dynamic>?;
if (chatObj != null) {
// Prefer chat.messages list
final list = chatObj['messages'];
if (list is List) {
final target = list.firstWhere(
(m) => (m is Map && (m['id']?.toString() == msgId)),
orElse: () => null,
);
if (target != null) {
final rawContent = (target as Map)['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
}
}
// Fallback to history map
if (content.isEmpty) {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<String, dynamic> messagesMap =
(history['messages'] as Map).cast<String, dynamic>();
final msg = messagesMap[msgId];
if (msg is Map) {
final rawContent = msg['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
}
}
}
}
if (content.isNotEmpty) {
ref
.read(chatMessagesProvider.notifier)
.replaceLastMessageContent(content);
}
} catch (_) {
// Swallow; we'll still finish streaming
} finally {
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
});
return; // Defer finish to microtask
}
}
}
// Normal path: finish now
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
}
} else if (type == 'request:chat:completion' && payload != null) {
// Mirror web client's execute path: listen on provided dynamic channel
final channel = payload['channel'];
if (channel is String && channel.isNotEmpty) {
DebugLogger.stream('Socket request:chat:completion channel=$channel');
void channelLineHandler(dynamic line) {
try {
if (line is String) {
final s = line.trim();
DebugLogger.stream('Socket [' + channel + '] line=' + (s.length > 160 ? s.substring(0, 160) + '' : s));
if (s == '[DONE]' || s == 'DONE') {
socketService.offEvent(channel);
// Channel completed
ref.read(chatMessagesProvider.notifier).finishStreaming();
return;
}
if (s.startsWith('data:')) {
final dataStr = s.substring(5).trim();
if (dataStr == '[DONE]') {
socketService.offEvent(channel);
ref.read(chatMessagesProvider.notifier).finishStreaming();
return;
}
// Try to parse OpenAI-style delta JSON
try {
final Map<String, dynamic> j = jsonDecode(dataStr);
final choices = j['choices'];
if (choices is List && choices.isNotEmpty) {
final choice = choices.first;
final delta = choice is Map ? choice['delta'] : null;
if (delta is Map) {
if (delta.containsKey('content')) {
final c = delta['content']?.toString() ?? '';
if (c.isNotEmpty) {
DebugLogger.stream('Socket [' + channel + '] delta.content len=' + c.length.toString());
}
}
// Surface tool_calls status
if (delta.containsKey('tool_calls')) {
DebugLogger.stream('Socket [' + channel + '] delta.tool_calls detected');
final tc = delta['tool_calls'];
if (tc is List) {
for (final call in tc) {
if (call is Map<String, dynamic>) {
final fn = call['function'];
final name = (fn is Map && fn['name'] is String)
? fn['name'] as String
: null;
if (name is String && name.isNotEmpty) {
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
ref.read(chatMessagesProvider.notifier).appendToLastMessage(status);
}
}
}
}
}
// Append streamed content
final content = delta['content']?.toString() ?? '';
if (content.isNotEmpty) {
ref.read(chatMessagesProvider.notifier).appendToLastMessage(content);
}
}
}
} catch (_) {
// Non-JSON line: append as-is
if (s.isNotEmpty) {
ref.read(chatMessagesProvider.notifier).appendToLastMessage(s);
}
}
} else {
// Plain text line
if (s.isNotEmpty) {
ref.read(chatMessagesProvider.notifier).appendToLastMessage(s);
}
}
} else if (line is Map) {
// If server sends { done: true } via channel
final done = line['done'] == true;
if (done) {
socketService.offEvent(channel);
ref.read(chatMessagesProvider.notifier).finishStreaming();
return;
}
}
} catch (_) {}
}
// Register dynamic channel listener
try {
socketService.onEvent(channel, channelLineHandler);
} catch (_) {}
}
} else if (type == 'execute:tool' && payload != null) {
// Show an executing tile immediately using provided tool info
try {
final name = payload['name']?.toString() ?? 'tool';
DebugLogger.stream('Socket execute:tool name=' + name);
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
ref.read(chatMessagesProvider.notifier).appendToLastMessage(status);
} catch (_) {}
}
} catch (_) {}
}
socketService.onChatEvents(chatHandler);
// Also mirror channel-events like the web client
void channelEventsHandler(Map<String, dynamic> ev) {
try {
final data = ev['data'];
if (data == null) return;
final type = data['type'];
final payload = data['data'];
DebugLogger.stream('Socket channel-events: type=' + type.toString());
// Handle generic channel progress messages if needed
if (type == 'message' && payload is Map) {
final content = payload['content']?.toString() ?? '';
if (content.isNotEmpty) {
ref.read(chatMessagesProvider.notifier).appendToLastMessage(content);
}
}
} catch (_) {}
}
socketService.onChannelEvents(channelEventsHandler);
Future.delayed(const Duration(seconds: 90), () {
try {
socketService.offChatEvents();
socketService.offChannelEvents();
} catch (_) {}
});
}
@@ -1432,8 +1640,11 @@ Future<void> _sendMessageInternal(
}
// Allow socket content again for future sessions (harmless if already false)
suppressSocketContent = false;
// Mark streaming as complete immediately for better UX
ref.read(chatMessagesProvider.notifier).finishStreaming();
// If this path was SSE-driven (no background socket), finish now.
// Otherwise keep streaming state until socket/dynamic channel signals done.
if (socketService == null) {
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
// Send chat completed notification to OpenWebUI
final messages = ref.read(chatMessagesProvider);