From d801fe9371320b0549290302820f88efcbb15bb0 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Mon, 1 Sep 2025 16:28:49 +0530 Subject: [PATCH] fix: tool calling --- lib/core/services/socket_service.dart | 25 ++ lib/core/utils/tool_calls_parser.dart | 86 +++++-- .../chat/providers/chat_providers.dart | 223 +++++++++++++++++- .../widgets/assistant_message_widget.dart | 102 +++++--- 4 files changed, 369 insertions(+), 67 deletions(-) diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart index 6c65b81..72a2711 100644 --- a/lib/core/services/socket_service.dart +++ b/lib/core/services/socket_service.dart @@ -69,10 +69,35 @@ class SocketService { }); } + // Subscribe to general channel events (server-broadcasted channel updates) + void onChannelEvents(void Function(Map event) handler) { + _socket?.on('channel-events', (data) { + try { + if (data is Map) { + handler(data); + } else if (data is Map) { + handler(Map.from(data)); + } + } catch (_) {} + }); + } + void offChatEvents() { _socket?.off('chat-events'); } + void offChannelEvents() { + _socket?.off('channel-events'); + } + + // Subscribe to an arbitrary socket.io event (used for dynamic tool channels) + void onEvent(String eventName, void Function(dynamic data) handler) { + _socket?.on(eventName, handler); + } + + void offEvent(String eventName) { + _socket?.off(eventName); + } void dispose() { try { _socket?.dispose(); diff --git a/lib/core/utils/tool_calls_parser.dart b/lib/core/utils/tool_calls_parser.dart index cab5655..f000733 100644 --- a/lib/core/utils/tool_calls_parser.dart +++ b/lib/core/utils/tool_calls_parser.dart @@ -34,6 +34,19 @@ class ToolCallsContent { /// Utility to parse
blocks from content class ToolCallsParser { + static String _unescapeHtml(String s) { + return s + .replaceAll('"', '"') + .replaceAll('"', '"') + .replaceAll(''', "'") + .replaceAll(''', "'") + .replaceAll('<', '<') + .replaceAll('<', '<') + .replaceAll('>', '>') + .replaceAll('>', '>') + .replaceAll('&', '&') + .replaceAll('&', '&'); + } /// Represents a mixed stream of text and tool-call entries in original order /// as they appeared in the content. static List? segments(String content) { @@ -59,12 +72,19 @@ class ToolCallsParser { // Find end of opening tag final openEnd = content.indexOf('>', start); if (openEnd == -1) { - // Malformed; append rest as text + // Malformed opening tag; append the rest as text and stop segs.add(ToolCallsSegment.text(content.substring(start))); break; } final openTag = content.substring(start, openEnd + 1); + // Parse attributes from opening tag immediately (to support streaming) + final attrs = {}; + final attrRegex = RegExp(r'(\w+)="(.*?)"'); + for (final m in attrRegex.allMatches(openTag)) { + attrs[m.group(1)!] = m.group(2) ?? ''; + } + // Find matching closing tag with nesting support int depth = 1; int i = openEnd + 1; @@ -81,28 +101,22 @@ class ToolCallsParser { } } - if (depth != 0) { - // Unclosed details; append the rest as text - segs.add(ToolCallsSegment.text(content.substring(start))); - break; - } + final isToolCalls = (attrs['type'] ?? '') == 'tool_calls'; - final fullMatch = content.substring(start, i); - - // Parse attributes from opening tag - final attrs = {}; - final attrRegex = RegExp(r'(\w+)="(.*?)"'); - for (final m in attrRegex.allMatches(openTag)) { - attrs[m.group(1)!] = m.group(2) ?? ''; - } - - if ((attrs['type'] ?? '') == 'tool_calls') { + if (isToolCalls) { + // Decode attributes for tool call tile dynamic _decode(String? s) { if (s == null || s.isEmpty) return null; try { - return json.decode(s); + final unescaped = _unescapeHtml(s); + return json.decode(unescaped); } catch (_) { - return s; + // If JSON decode fails, return unescaped string for display + try { + return _unescapeHtml(s); + } catch (_) { + return s; + } } } @@ -125,11 +139,26 @@ class ToolCallsParser { ), ), ); - } else { - segs.add(ToolCallsSegment.text(fullMatch)); + + // If details not closed yet, stop scanning (wait for more stream) + if (depth != 0) { + break; + } + + // If closed, advance index to the end of the block + index = i; + continue; } - index = i; + // Non-tool_calls: keep as text (full block) when closed; if not closed, append remainder and stop + if (depth != 0) { + segs.add(ToolCallsSegment.text(content.substring(start))); + break; + } else { + final fullMatch = content.substring(start, i); + segs.add(ToolCallsSegment.text(fullMatch)); + index = i; + } } return segs.isEmpty ? null : segs; @@ -139,6 +168,7 @@ class ToolCallsParser { static ToolCallsContent? parse(String content) { if (content.isEmpty || !content.contains(']*>[\s\S]*?<\/details>', + multiLine: true, + dotAll: true, + ), + '', + ) + .trim(); + if (cleaned.isNotEmpty) buf.write(cleaned); } } @@ -209,4 +250,3 @@ class ToolCallsSegment { bool get isToolCall => entry != null; } - diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 8838eb0..503d90e 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -11,6 +11,7 @@ import '../../../core/providers/app_providers.dart'; import '../../../core/auth/auth_state_manager.dart'; import '../../../core/utils/stream_chunker.dart'; import '../../../core/services/persistent_streaming_service.dart'; +import '../../../core/utils/debug_logger.dart'; import '../services/reviewer_mode_service.dart'; // Chat messages for current conversation @@ -408,6 +409,10 @@ Future regenerateMessage( ); ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage); + // Reviewer mode: no immediate tool preview (no tool context) + + // Reviewer mode: no immediate tool preview (no tool context) + // Use canned response for regeneration final responseText = ReviewerModeService.generateResponse( userMessage: userMessageContent, @@ -1064,7 +1069,9 @@ Future _sendMessageInternal( // If socket is available, start listening for chat-events immediately // For background-tools flow (when socket session is present), socket is the primary stream. // In that case, do NOT suppress socket content. - bool suppressSocketContent = (socketSessionId == null); // only suppress when using SSE stream + // Suppress socket TEXT content when we already have a stream (SSE or polling) + // but DO allow tool_call status via socket to surface tiles immediately. + bool suppressSocketContent = (socketSessionId == null); // text-only suppression if (socketService != null) { void chatHandler(Map ev) { try { @@ -1072,10 +1079,12 @@ Future _sendMessageInternal( if (data == null) return; final type = data['type']; final payload = data['data']; + DebugLogger.stream('Socket chat-events: type=$type'); if (type == 'chat:completion' && payload != null) { if (payload is Map) { // Provider may emit tool_calls at the top level - if (!suppressSocketContent && payload.containsKey('tool_calls')) { + // Always surface tool_calls status from socket for instant tiles + if (payload.containsKey('tool_calls')) { final tc = payload['tool_calls']; if (tc is List) { for (final call in tc) { @@ -1146,19 +1155,218 @@ Future _sendMessageInternal( } } if (payload['done'] == true) { - // Do not force finish here to avoid cutting off active streams. - // Just stop listening to further socket events for this session. + // Stop listening to further socket events for this session. try { socketService.offChatEvents(); } catch (_) {} + + // If no content was rendered yet, fetch final assistant message from server + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final lastContent = msgs.last.content.trim(); + if (lastContent.isEmpty) { + final apiSvc = ref.read(apiServiceProvider); + final chatId = activeConversation?.id; + final msgId = assistantMessageId; + if (apiSvc != null && chatId != null && chatId.isNotEmpty) { + Future.microtask(() async { + try { + final resp = await apiSvc.dio.get('/api/v1/chats/' + chatId); + final data = resp.data as Map; + String content = ''; + final chatObj = data['chat'] as Map?; + if (chatObj != null) { + // Prefer chat.messages list + final list = chatObj['messages']; + if (list is List) { + final target = list.firstWhere( + (m) => (m is Map && (m['id']?.toString() == msgId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + // Fallback to history map + if (content.isEmpty) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map messagesMap = + (history['messages'] as Map).cast(); + final msg = messagesMap[msgId]; + if (msg is Map) { + final rawContent = msg['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + } + } + + if (content.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .replaceLastMessageContent(content); + } + } catch (_) { + // Swallow; we'll still finish streaming + } finally { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + }); + return; // Defer finish to microtask + } + } + } + // Normal path: finish now + ref.read(chatMessagesProvider.notifier).finishStreaming(); } } + } else if (type == 'request:chat:completion' && payload != null) { + // Mirror web client's execute path: listen on provided dynamic channel + final channel = payload['channel']; + if (channel is String && channel.isNotEmpty) { + DebugLogger.stream('Socket request:chat:completion channel=$channel'); + void channelLineHandler(dynamic line) { + try { + if (line is String) { + final s = line.trim(); + DebugLogger.stream('Socket [' + channel + '] line=' + (s.length > 160 ? s.substring(0, 160) + '…' : s)); + if (s == '[DONE]' || s == 'DONE') { + socketService.offEvent(channel); + // Channel completed + ref.read(chatMessagesProvider.notifier).finishStreaming(); + return; + } + if (s.startsWith('data:')) { + final dataStr = s.substring(5).trim(); + if (dataStr == '[DONE]') { + socketService.offEvent(channel); + ref.read(chatMessagesProvider.notifier).finishStreaming(); + return; + } + // Try to parse OpenAI-style delta JSON + try { + final Map j = jsonDecode(dataStr); + final choices = j['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first; + final delta = choice is Map ? choice['delta'] : null; + if (delta is Map) { + if (delta.containsKey('content')) { + final c = delta['content']?.toString() ?? ''; + if (c.isNotEmpty) { + DebugLogger.stream('Socket [' + channel + '] delta.content len=' + c.length.toString()); + } + } + // Surface tool_calls status + if (delta.containsKey('tool_calls')) { + DebugLogger.stream('Socket [' + channel + '] delta.tool_calls detected'); + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final status = '\n
Executing...\n
\n'; + ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + } + } + } + } + } + // Append streamed content + final content = delta['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + } + } + } + } catch (_) { + // Non-JSON line: append as-is + if (s.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); + } + } + } else { + // Plain text line + if (s.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); + } + } + } else if (line is Map) { + // If server sends { done: true } via channel + final done = line['done'] == true; + if (done) { + socketService.offEvent(channel); + ref.read(chatMessagesProvider.notifier).finishStreaming(); + return; + } + } + } catch (_) {} + } + + // Register dynamic channel listener + try { + socketService.onEvent(channel, channelLineHandler); + } catch (_) {} + } + } else if (type == 'execute:tool' && payload != null) { + // Show an executing tile immediately using provided tool info + try { + final name = payload['name']?.toString() ?? 'tool'; + DebugLogger.stream('Socket execute:tool name=' + name); + final status = '\n
Executing...\n
\n'; + ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + } catch (_) {} } } catch (_) {} } socketService.onChatEvents(chatHandler); + // Also mirror channel-events like the web client + void channelEventsHandler(Map ev) { + try { + final data = ev['data']; + if (data == null) return; + final type = data['type']; + final payload = data['data']; + DebugLogger.stream('Socket channel-events: type=' + type.toString()); + // Handle generic channel progress messages if needed + if (type == 'message' && payload is Map) { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + } + } + } catch (_) {} + } + socketService.onChannelEvents(channelEventsHandler); Future.delayed(const Duration(seconds: 90), () { try { socketService.offChatEvents(); + socketService.offChannelEvents(); } catch (_) {} }); } @@ -1432,8 +1640,11 @@ Future _sendMessageInternal( } // Allow socket content again for future sessions (harmless if already false) suppressSocketContent = false; - // Mark streaming as complete immediately for better UX - ref.read(chatMessagesProvider.notifier).finishStreaming(); + // If this path was SSE-driven (no background socket), finish now. + // Otherwise keep streaming state until socket/dynamic channel signals done. + if (socketService == null) { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } // Send chat completed notification to OpenWebUI final messages = ref.read(chatMessagesProvider); diff --git a/lib/features/chat/widgets/assistant_message_widget.dart b/lib/features/chat/widgets/assistant_message_widget.dart index d37f9f0..e90a0e7 100644 --- a/lib/features/chat/widgets/assistant_message_widget.dart +++ b/lib/features/chat/widgets/assistant_message_widget.dart @@ -171,12 +171,11 @@ class _AssistantMessageWidgetState extends ConsumerState } void _updateTypingIndicatorGate() { - // Only show typing indicator if streaming and nothing renderable yet, - // and only after a short delay to avoid flicker when content arrives quickly. + // Show typing indicator while streaming until we have any renderable segments + // (tool tiles or actual text). Use a short delay to avoid flicker. _typingGateTimer?.cancel(); final hasRenderable = _hasRenderableSegments; - final contentEmpty = (widget.message.content ?? '').trim().isEmpty; - if (widget.isStreaming && !hasRenderable && contentEmpty) { + if (widget.isStreaming && !hasRenderable) { _allowTypingIndicator = false; _typingGateTimer = Timer(const Duration(milliseconds: 150), () { if (mounted) { @@ -367,11 +366,32 @@ class _AssistantMessageWidgetState extends ConsumerState } bool get _hasRenderableSegments { - for (final seg in _toolSegments) { - if ((seg.isToolCall && seg.entry != null) || - ((seg.text ?? '').trim().isNotEmpty)) { - return true; + bool _textRenderable(String t) { + String cleaned = t; + // Hide tool_calls blocks entirely + cleaned = cleaned.replaceAll( + RegExp( + r']*>[\s\S]*?<\/details>', + multiLine: true, + dotAll: true, + ), + '', + ); + // If last
is unclosed, drop tail to avoid rendering raw tag + final lastOpen = cleaned.lastIndexOf('= 0) { + final tail = cleaned.substring(lastOpen); + if (!tail.contains('
')) { + cleaned = cleaned.substring(0, lastOpen); + } } + return cleaned.trim().isNotEmpty; + } + + for (final seg in _toolSegments) { + if (seg.isToolCall && seg.entry != null) return true; + final text = seg.text ?? ''; + if (_textRenderable(text)) return true; } return false; } @@ -623,24 +643,22 @@ class _AssistantMessageWidgetState extends ConsumerState return const SizedBox.shrink(); } - // For streaming, hide any tool_calls
blocks that may be incomplete - // to avoid showing raw tag text; tiles will render once blocks complete. - String cleaned = content; - if (widget.isStreaming) { - cleaned = cleaned.replaceAll( - RegExp( - r']*>[\s\S]*?<\/details>', - multiLine: true, - dotAll: true, - ), - '', - ); - final lastOpen = cleaned.lastIndexOf('= 0) { - final tail = cleaned.substring(lastOpen); - if (!tail.contains('
')) { - cleaned = cleaned.substring(0, lastOpen); - } + // Always hide tool_calls blocks; tiles render them separately. + String cleaned = content.replaceAll( + RegExp( + r']*>[\s\S]*?<\/details>', + multiLine: true, + dotAll: true, + ), + '', + ); + + // If there's an unclosed
, drop the tail to avoid raw tags. + final lastOpen = cleaned.lastIndexOf('= 0) { + final tail = cleaned.substring(lastOpen); + if (!tail.contains('
')) { + cleaned = cleaned.substring(0, lastOpen); } } @@ -805,14 +823,22 @@ class _AssistantMessageWidgetState extends ConsumerState children: [ // Increase spacing between assistant name and typing indicator const SizedBox(height: Spacing.md), - Row( - children: [ - _buildTypingDot(0), - const SizedBox(width: Spacing.xs), - _buildTypingDot(200), - const SizedBox(width: Spacing.xs), - _buildTypingDot(400), - ], + // Give the dots breathing room to avoid any clip from transitions + Padding( + padding: const EdgeInsets.only(left: 4, bottom: 4), + child: SizedBox( + height: 14, + child: Row( + mainAxisSize: MainAxisSize.min, + children: [ + _buildTypingDot(0), + const SizedBox(width: Spacing.xs), + _buildTypingDot(200), + const SizedBox(width: Spacing.xs), + _buildTypingDot(400), + ], + ), + ), ), ], ); @@ -822,8 +848,8 @@ class _AssistantMessageWidgetState extends ConsumerState Widget _buildTypingDot(int delay) { return Container( - width: 8, - height: 8, + width: 10, + height: 10, decoration: BoxDecoration( color: context.conduitTheme.textSecondary.withValues(alpha: 0.6), borderRadius: BorderRadius.circular(AppBorderRadius.xs), @@ -833,12 +859,12 @@ class _AssistantMessageWidgetState extends ConsumerState .scale( duration: const Duration(milliseconds: 1000), begin: const Offset(1, 1), - end: const Offset(1.3, 1.3), + end: const Offset(1.25, 1.25), ) .then(delay: Duration(milliseconds: delay)) .scale( duration: const Duration(milliseconds: 1000), - begin: const Offset(1.3, 1.3), + begin: const Offset(1.25, 1.25), end: const Offset(1, 1), ); }