From e752a27781d531be22f8502775bfd09acec745f9 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sun, 31 Aug 2025 14:02:44 +0530 Subject: [PATCH] feat: proper socket implementation --- lib/core/providers/app_providers.dart | 22 + lib/core/services/api_service.dart | 386 ++++++++++++++- lib/core/services/socket_service.dart | 82 ++++ lib/core/utils/tool_calls_parser.dart | 227 +++++++++ .../chat/providers/chat_providers.dart | 463 ++++++++++++++---- .../widgets/assistant_message_widget.dart | 281 ++++++++--- pubspec.lock | 18 +- pubspec.yaml | 2 + 8 files changed, 1321 insertions(+), 160 deletions(-) create mode 100644 lib/core/services/socket_service.dart create mode 100644 lib/core/utils/tool_calls_parser.dart diff --git a/lib/core/providers/app_providers.dart b/lib/core/providers/app_providers.dart index 1e7a747..bf0be92 100644 --- a/lib/core/providers/app_providers.dart +++ b/lib/core/providers/app_providers.dart @@ -20,6 +20,7 @@ import '../models/file_info.dart'; import '../models/knowledge_base.dart'; import '../services/settings_service.dart'; import '../services/optimized_storage_service.dart'; +import '../services/socket_service.dart'; import '../utils/debug_logger.dart'; // Storage providers @@ -188,6 +189,27 @@ final apiServiceProvider = Provider((ref) { ); }); +// Socket.IO service provider +final socketServiceProvider = Provider((ref) { + final reviewerMode = ref.watch(reviewerModeProvider); + if (reviewerMode) return null; + + final activeServer = ref.watch(activeServerProvider); + final token = ref.watch(authTokenProvider3); + + return activeServer.maybeWhen( + data: (server) { + if (server == null) return null; + final s = SocketService(serverConfig: server, authToken: token); + // best-effort connect; errors handled internally + // ignore unawaited_futures + s.connect(); + return s; + }, + orElse: () => null, + ); +}); + // Attachment upload queue provider final attachmentUploadQueueProvider = Provider((ref) { final api = ref.watch(apiServiceProvider); diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 489c6b2..467d583 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -15,6 +15,7 @@ import '../auth/api_auth_interceptor.dart'; import '../validation/validation_interceptor.dart'; import '../error/api_error_interceptor.dart'; import 'sse_parser.dart'; +// Tool-call details are parsed in the UI layer to render collapsible blocks import 'stream_recovery_service.dart'; import 'persistent_streaming_service.dart'; import '../utils/debug_logger.dart'; @@ -2355,12 +2356,17 @@ class ApiService { bool enableWebSearch = false, bool enableImageGeneration = false, Map? modelItem, + String? sessionIdOverride, + List>? toolServers, + Map? backgroundTasks, }) { final streamController = StreamController(); // Generate unique IDs final messageId = const Uuid().v4(); - final sessionId = const Uuid().v4().substring(0, 20); + final sessionId = (sessionIdOverride != null && sessionIdOverride.isNotEmpty) + ? sessionIdOverride + : const Uuid().v4().substring(0, 20); // NOTE: Previously used to branch for Gemini-specific handling; not needed now. @@ -2455,6 +2461,23 @@ class ApiService { if (toolIds != null && toolIds.isNotEmpty) { data['tool_ids'] = toolIds; debugPrint('DEBUG: Including tool_ids in SSE request: $toolIds'); + + // Hint server to use native function calling when tools are selected + // This enables provider-native tool execution paths and consistent UI events + try { + final params = (data['params'] as Map? ) ?? {}; + params['function_calling'] = 'native'; + data['params'] = params; + debugPrint('DEBUG: Set params.function_calling = native'); + } catch (_) { + // Non-fatal; continue without forcing native mode + } + } + + // Include tool_servers if provided (for native function calling with OpenAPI servers) + if (toolServers != null && toolServers.isNotEmpty) { + data['tool_servers'] = toolServers; + debugPrint('DEBUG: Including tool_servers in request (${toolServers.length})'); } // Include non-image files at the top level as expected by Open WebUI @@ -2482,8 +2505,53 @@ class ApiService { debugPrint('DEBUG: session_id value: ${data['session_id']}'); debugPrint('DEBUG: id value: ${data['id']}'); - // Use SSE streaming with proper parser - _streamSSE(data, streamController, messageId); + // If tools are requested, use background task flow to allow server-side execution. + // Open WebUI executes tools and continues the response outside of the + // provider SSE. That path requires background task mode (session_id + id + chat_id). + if (conversationId != null) { + // Attach identifiers to trigger background task processing on the server + data['session_id'] = sessionId; + data['id'] = messageId; + data['chat_id'] = conversationId; + + // Attach background_tasks if provided + if (backgroundTasks != null && backgroundTasks.isNotEmpty) { + data['background_tasks'] = backgroundTasks; + } + + debugPrint('DEBUG: Initiating background tools flow (task-based)'); + debugPrint('DEBUG: Posting to /api/chat/completions (no SSE)'); + + // Fire in background; poll chat for updates and stream deltas to UI + () async { + try { + final resp = await _dio.post('/api/chat/completions', data: data); + final respData = resp.data; + final taskId = (respData is Map) ? (respData['task_id']?.toString()) : null; + debugPrint('DEBUG: Background task created: $taskId'); + + // If no session/socket provided, fall back to polling for updates. + if (sessionIdOverride == null || sessionIdOverride.isEmpty) { + await _pollChatForMessageUpdates( + chatId: conversationId!, + messageId: messageId, + streamController: streamController, + ); + } else { + // Close the controller so listeners don't hang waiting for chunks + if (!streamController.isClosed) { + streamController.close(); + } + } + } catch (e) { + debugPrint('DEBUG: Background tools flow failed: $e'); + if (!streamController.isClosed) streamController.close(); + } + }(); + } else { + // Use SSE streaming with proper parser + _streamSSE(data, streamController, messageId); + } return ( stream: streamController.stream, @@ -2492,6 +2560,189 @@ class ApiService { ); } + // Poll the server chat until the assistant message is populated with tool results, + // then stream deltas to the UI and close. + Future _pollChatForMessageUpdates({ + required String chatId, + required String messageId, + required StreamController streamController, + }) async { + String last = ''; + final started = DateTime.now(); + + bool containsDone(String s) => + s.contains('
; + + // Locate assistant content from multiple shapes + String content = ''; + + Map? chatObj = + (data['chat'] is Map) ? data['chat'] as Map : null; + + // 1) Preferred: chat.messages (list) – try exact id first + if (chatObj != null && chatObj['messages'] is List) { + final List messagesList = chatObj['messages'] as List; + final target = messagesList.firstWhere( + (m) => (m is Map && (m['id']?.toString() == messageId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; + if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } else if (rawContent is String) { + content = rawContent; + } + } + } + + // 2) Fallback: chat.history.messages (map) – try exact id + if (content.isEmpty && chatObj != null) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map messagesMap = + (history['messages'] as Map).cast(); + final msg = messagesMap[messageId]; + if (msg is Map) { + final rawContent = msg['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + } + + // 3) Last resort: top-level messages (list) – try exact id + if (content.isEmpty && data['messages'] is List) { + final List topMessages = data['messages'] as List; + final target = topMessages.firstWhere( + (m) => (m is Map && (m['id']?.toString() == messageId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + + // 4) If nothing found by id, fall back to the latest assistant message + if (content.isEmpty) { + // Prefer chat.messages list + if (chatObj != null && chatObj['messages'] is List) { + final List messagesList = chatObj['messages'] as List; + // Find last assistant + for (int i = messagesList.length - 1; i >= 0; i--) { + final m = messagesList[i]; + if (m is Map && (m['role']?.toString() == 'assistant')) { + final rawContent = m['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + if (content.isNotEmpty) break; + } + } + } + + // Try history map if still empty + if (content.isEmpty && chatObj != null) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map msgMapDyn = history['messages'] as Map; + // Iterate by values; no guaranteed ordering, but often sufficient + for (final entry in msgMapDyn.values) { + if (entry is Map && (entry['role']?.toString() == 'assistant')) { + final rawContent = entry['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + if (content.isNotEmpty) break; + } + } + } + } + } + + if (content.isEmpty) { + continue; + } + + // Stream only the delta when content grows monotonically + if (content.startsWith(last)) { + final delta = content.substring(last.length); + if (delta.isNotEmpty && !streamController.isClosed) { + streamController.add(delta); + } + } else { + // Fallback: replace entire content by emitting a separator + full content + if (!streamController.isClosed) { + streamController.add('\n'); + streamController.add(content); + } + } + last = content; + + // Stop when we detect done=true on tool_calls or when content stabilizes + if (containsDone(content)) { + break; + } + } catch (e) { + // Ignore transient errors and continue polling + } + } + + if (!streamController.isClosed) { + streamController.close(); + } + } + // SSE streaming with persistent background support - Main Implementation void _streamSSE( Map data, @@ -2873,6 +3124,26 @@ class ApiService { // We do NOT return here; model can send content alongside reasoning later } + // 1a) Surface tool call deltas as lightweight status updates + // Some providers stream tool_calls without content; show a hint so UI isn't stuck + if (delta.containsKey('tool_calls')) { + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null; + if (name is String && name.isNotEmpty) { + final status = '\n
Executing...\n
\n'; + if (!streamController.isClosed) { + streamController.add(status); + } + } + } + } + } + } + // Extract content if (delta.containsKey('content')) { final content = delta['content'] as String?; @@ -2904,17 +3175,19 @@ class ApiService { debugPrint( 'Persistent: Stream finished with reason: $finishReason', ); - // Ensure reasoning block is closed when finishing - _closeReasoningBlockIfOpen(streamController, persistentStreamId); - if (!streamController.isClosed) { - streamController.close(); + // Do NOT close on tool_calls; server will continue with tool execution updates + if (finishReason != 'tool_calls') { + _closeReasoningBlockIfOpen(streamController, persistentStreamId); + if (!streamController.isClosed) { + streamController.close(); + } + return; } - return; } } else if (choice.containsKey('finish_reason')) { // Check for completion at choice level final finishReason = choice['finish_reason']; - if (finishReason != null) { + if (finishReason != null && finishReason != 'tool_calls') { debugPrint( 'Persistent: Stream finished with reason: $finishReason', ); @@ -2969,14 +3242,103 @@ class ApiService { _closeReasoningBlockIfOpen(streamController, persistentStreamId); - if (!streamController.isClosed) { - streamController.add(content); + // Emit only the delta when server sends cumulative content + try { + final meta = + persistentService.getStreamMetadata(persistentStreamId); + final last = (meta != null && meta['lastContent'] is String) + ? (meta['lastContent'] as String) + : ''; + + String toEmit; + if (content.startsWith(last)) { + toEmit = content.substring(last.length); + } else { + // Fallback: emit suffix after longest common prefix + int i = 0; + final minLen = last.length < content.length + ? last.length + : content.length; + while (i < minLen && last.codeUnitAt(i) == content.codeUnitAt(i)) { + i++; + } + toEmit = content.substring(i); + } + + if (toEmit.isNotEmpty && !streamController.isClosed) { + streamController.add(toEmit); + } + + // Update persistent progress with the full content snapshot + persistentService.updateStreamProgress( + persistentStreamId, + chunkSequence: chunkSequence, + content: content, + ); + } catch (_) { + // Best-effort fallback: append as-is + if (!streamController.isClosed) { + streamController.add(content); + } + persistentService.updateStreamProgress( + persistentStreamId, + chunkSequence: chunkSequence, + content: content, + ); + } + } + } + } + + // Handle Open WebUI aggregated content blocks + // Server emits top-level { content: "...serialized blocks..." } updates + if (json.containsKey('content')) { + final contentVal = json['content']; + if (contentVal is String && contentVal.isNotEmpty) { + // Close reasoning section before appending rich content + _closeReasoningBlockIfOpen(streamController, persistentStreamId); + + // Emit only the delta when server sends cumulative content + try { + final meta = + persistentService.getStreamMetadata(persistentStreamId); + final last = (meta != null && meta['lastContent'] is String) + ? (meta['lastContent'] as String) + : ''; + + String toEmit; + if ((contentVal as String).startsWith(last)) { + toEmit = contentVal.substring(last.length); + } else { + // Fallback: emit suffix after longest common prefix + int i = 0; + final s = contentVal as String; + final minLen = last.length < s.length ? last.length : s.length; + while (i < minLen && last.codeUnitAt(i) == s.codeUnitAt(i)) { + i++; + } + toEmit = s.substring(i); } + if (toEmit.isNotEmpty && !streamController.isClosed) { + streamController.add(toEmit); + } + + // Update persistent progress with the full content snapshot persistentService.updateStreamProgress( persistentStreamId, chunkSequence: chunkSequence, - content: content, // Full content, not appended + content: contentVal, + ); + } catch (_) { + // Best-effort fallback: append as-is + if (!streamController.isClosed) { + streamController.add(contentVal); + } + persistentService.updateStreamProgress( + persistentStreamId, + chunkSequence: chunkSequence, + content: contentVal, ); } } diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart new file mode 100644 index 0000000..6c65b81 --- /dev/null +++ b/lib/core/services/socket_service.dart @@ -0,0 +1,82 @@ +import 'package:socket_io_client/socket_io_client.dart' as IO; +import 'package:flutter/foundation.dart'; +import '../models/server_config.dart'; + +class SocketService { + final ServerConfig serverConfig; + final String? authToken; + IO.Socket? _socket; + + SocketService({required this.serverConfig, required this.authToken}); + + String? get sessionId => _socket?.id; + IO.Socket? get socket => _socket; + + bool get isConnected => _socket?.connected == true; + + Future connect({bool force = false}) async { + if (_socket != null && _socket!.connected && !force) return; + + try { + _socket?.dispose(); + } catch (_) {} + + final base = serverConfig.url.replaceFirst(RegExp(r'/+$'), ''); + final path = '/ws/socket.io'; + + _socket = IO.io( + base, + IO.OptionBuilder() + .setTransports(['websocket']) + .setPath(path) + .setExtraHeaders( + authToken != null && authToken!.isNotEmpty + ? { + 'Authorization': 'Bearer $authToken', + } + : {}, + ) + .build(), + ); + + _socket!.on('connect', (_) { + debugPrint('Socket connected: ${_socket!.id}'); + if (authToken != null && authToken!.isNotEmpty) { + _socket!.emit('user-join', { + 'auth': {'token': authToken} + }); + } + }); + + _socket!.on('connect_error', (err) { + debugPrint('Socket connect_error: $err'); + }); + + _socket!.on('disconnect', (reason) { + debugPrint('Socket disconnected: $reason'); + }); + } + + void onChatEvents(void Function(Map event) handler) { + _socket?.on('chat-events', (data) { + try { + if (data is Map) { + handler(data); + } else if (data is Map) { + handler(Map.from(data)); + } + } catch (_) {} + }); + } + + void offChatEvents() { + _socket?.off('chat-events'); + } + + void dispose() { + try { + _socket?.dispose(); + } catch (_) {} + _socket = null; + } +} diff --git a/lib/core/utils/tool_calls_parser.dart b/lib/core/utils/tool_calls_parser.dart new file mode 100644 index 0000000..de99794 --- /dev/null +++ b/lib/core/utils/tool_calls_parser.dart @@ -0,0 +1,227 @@ +import 'dart:convert'; + +/// Parsed representation of one tool call emitted as a `
` block +class ToolCallEntry { + final String id; + final String name; + final bool done; + final dynamic arguments; // decoded JSON when possible, else String + final dynamic result; // decoded JSON when possible, else String + final List? files; // decoded JSON array when present + + const ToolCallEntry({ + required this.id, + required this.name, + required this.done, + this.arguments, + this.result, + this.files, + }); +} + +/// Container for extracted tool calls and the remaining main content +class ToolCallsContent { + final List toolCalls; + final String mainContent; + final String originalContent; + + const ToolCallsContent({ + required this.toolCalls, + required this.mainContent, + required this.originalContent, + }); +} + +/// Utility to parse
blocks from content +class ToolCallsParser { + /// Represents a mixed stream of text and tool-call entries in original order + /// as they appeared in the content. + static List? segments(String content) { + if (content.isEmpty || !content.contains(']*)>\s*[^<]*<\/summary>\s*<\/details>', + multiLine: true, + dotAll: true, + ); + + final matches = detailsRegex.allMatches(content).toList(); + if (matches.isEmpty) return null; + + final segs = []; + int lastEnd = 0; + + for (final m in matches) { + // Text before this block + if (m.start > lastEnd) { + segs.add(ToolCallsSegment.text(content.substring(lastEnd, m.start))); + } + + final fullMatch = m.group(0) ?? ''; + final attrs = m.group(1) ?? ''; + + if (attrs.contains('type="tool_calls"')) { + String? _attr(String name) { + final r = RegExp('$name="([^"]*)"'); + final mm = r.firstMatch(attrs); + return mm != null ? _unescapeHtml(mm.group(1) ?? '') : null; + } + + final id = _attr('id') ?? ''; + final name = _attr('name') ?? 'tool'; + final done = (_attr('done') == 'true'); + final args = _tryDecodeJson(_attr('arguments')); + final result = _tryDecodeJson(_attr('result')); + final files = _tryDecodeJson(_attr('files')); + + final entry = ToolCallEntry( + id: id.isNotEmpty ? id : '${name}_${m.start}', + name: name, + done: done, + arguments: args, + result: result, + files: (files is List) ? files : null, + ); + segs.add(ToolCallsSegment.entry(entry)); + } else { + // Not a tool_calls block: keep it as text + segs.add(ToolCallsSegment.text(fullMatch)); + } + + lastEnd = m.end; + } + + // Tail text + if (lastEnd < content.length) { + segs.add(ToolCallsSegment.text(content.substring(lastEnd))); + } + + return segs; + } + /// Extracts tool call blocks and returns the remaining content with those blocks removed. + static ToolCallsContent? parse(String content) { + if (content.isEmpty || !content.contains(']*)>\s*[^<]*<\/summary>\s*<\/details>', + multiLine: true, + dotAll: true, + ); + + final matches = detailsRegex.allMatches(content).toList(); + if (matches.isEmpty) return null; + + final calls = []; + for (final m in matches) { + final attrs = m.group(1) ?? ''; + if (!attrs.contains('type="tool_calls"')) continue; + + String? _attr(String name) { + final r = RegExp('$name="([^"]*)"'); + final mm = r.firstMatch(attrs); + return mm != null ? _unescapeHtml(mm.group(1) ?? '') : null; + } + + final id = _attr('id') ?? ''; + final name = _attr('name') ?? 'tool'; + final done = (_attr('done') == 'true'); + final args = _tryDecodeJson(_attr('arguments')); + final result = _tryDecodeJson(_attr('result')); + final files = _tryDecodeJson(_attr('files')); + + calls.add( + ToolCallEntry( + id: id.isNotEmpty ? id : '${name}_${m.start}', + name: name, + done: done, + arguments: args, + result: result, + files: (files is List) ? files : null, + ), + ); + } + + if (calls.isEmpty) return null; + + final main = content.replaceAll(detailsRegex, '').trim(); + return ToolCallsContent(toolCalls: calls, mainContent: main, originalContent: content); + } + + /// Legacy helper that summarizes tool blocks to text (kept for fallback) + static String summarize(String content) { + final parsed = parse(content); + if (parsed == null) return content; + final buf = StringBuffer(); + for (final c in parsed.toolCalls) { + buf.writeln(c.done ? 'Tool Executed: ${c.name}' : 'Running tool: ${c.name}…'); + final args = _prettyMaybe(c.arguments, max: 400); + final res = _prettyMaybe(c.result, max: 800); + if (args.isNotEmpty) { + buf.writeln('\nArguments:\n```json'); + buf.writeln(args); + buf.writeln('```'); + } + if (res.isNotEmpty) { + buf.writeln('\nResult:\n```json'); + buf.writeln(res); + buf.writeln('```'); + } + buf.writeln(); + } + buf.writeln(parsed.mainContent); + return buf.toString().trim(); + } + + static dynamic _tryDecodeJson(String? raw) { + if (raw == null || raw.trim().isEmpty) return null; + try { + dynamic decoded = json.decode(raw); + if (decoded is String) { + final s = decoded.trim(); + if ((s.startsWith('{') && s.endsWith('}')) || (s.startsWith('[') && s.endsWith(']'))) { + try { + decoded = json.decode(s); + } catch (_) {} + } + } + return decoded; + } catch (_) { + return raw; + } + } + + static String _prettyMaybe(dynamic value, {int max = 600}) { + if (value == null) return ''; + try { + final pretty = const JsonEncoder.withIndent(' ').convert(value); + return pretty.length > max ? pretty.substring(0, max) + '\n…' : pretty; + } catch (_) { + final raw = value.toString(); + return raw.length > max ? raw.substring(0, max) + '…' : raw; + } + } + + static String _unescapeHtml(String input) { + return input + .replaceAll('"', '"') + .replaceAll('"', '"') + .replaceAll(''', "'") + .replaceAll(''', "'") + .replaceAll('<', '<') + .replaceAll('>', '>') + .replaceAll('&', '&'); + } +} + +/// Ordered piece of content: either plain text or a tool-call entry +class ToolCallsSegment { + final String? text; + final ToolCallEntry? entry; + + const ToolCallsSegment._({this.text, this.entry}); + factory ToolCallsSegment.text(String text) => ToolCallsSegment._(text: text); + factory ToolCallsSegment.entry(ToolCallEntry entry) => + ToolCallsSegment._(entry: entry); + + bool get isToolCall => entry != null; +} diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 174472a..fc86f3d 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert'; +import 'package:yaml/yaml.dart' as yaml; import 'package:flutter/foundation.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; @@ -148,9 +149,22 @@ class ChatMessagesNotifier extends StateNotifier> { final lastMessage = state.last; if (lastMessage.role != 'assistant') return; + // Ensure we never keep the typing placeholder in persisted content + String sanitized(String s) { + const ti = '[TYPING_INDICATOR]'; + const searchBanner = '🔍 Searching the web...'; + if (s.startsWith(ti)) { + s = s.substring(ti.length); + } + if (s.startsWith(searchBanner)) { + s = s.substring(searchBanner.length); + } + return s; + } + state = [ ...state.sublist(0, state.length - 1), - lastMessage.copyWith(content: content), + lastMessage.copyWith(content: sanitized(content)), ]; } @@ -175,10 +189,17 @@ class ChatMessagesNotifier extends StateNotifier> { return; } - // If the current content is just the typing indicator, replace it instead of appending - final newContent = lastMessage.content == '[TYPING_INDICATOR]' - ? content - : lastMessage.content + content; + // Strip a leading typing indicator if present, then append delta + const ti = '[TYPING_INDICATOR]'; + const searchBanner = '🔍 Searching the web...'; + String current = lastMessage.content; + if (current.startsWith(ti)) { + current = current.substring(ti.length); + } + if (current.startsWith(searchBanner)) { + current = current.substring(searchBanner.length); + } + final newContent = current.isEmpty ? content : current + content; state = [ ...state.sublist(0, state.length - 1), @@ -196,9 +217,19 @@ class ChatMessagesNotifier extends StateNotifier> { return; } + // Remove typing indicator if present in the replacement + String sanitized = content; + const ti = '[TYPING_INDICATOR]'; + const searchBanner = '🔍 Searching the web...'; + if (sanitized.startsWith(ti)) { + sanitized = sanitized.substring(ti.length); + } + if (sanitized.startsWith(searchBanner)) { + sanitized = sanitized.substring(searchBanner.length); + } state = [ ...state.sublist(0, state.length - 1), - lastMessage.copyWith(content: content), + lastMessage.copyWith(content: sanitized), ]; } @@ -208,9 +239,20 @@ class ChatMessagesNotifier extends StateNotifier> { final lastMessage = state.last; if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) return; + // Also strip any leftover typing indicator before finalizing + const ti = '[TYPING_INDICATOR]'; + const searchBanner = '🔍 Searching the web...'; + String cleaned = lastMessage.content; + if (cleaned.startsWith(ti)) { + cleaned = cleaned.substring(ti.length); + } + if (cleaned.startsWith(searchBanner)) { + cleaned = cleaned.substring(searchBanner.length); + } + state = [ ...state.sublist(0, state.length - 1), - lastMessage.copyWith(isStreaming: false), + lastMessage.copyWith(isStreaming: false, content: cleaned), ]; } @@ -429,7 +471,7 @@ Future regenerateMessage( ); ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage); - // Handle streaming response + // Handle streaming response (basic chunking for this path) final chunkedStream = StreamChunker.chunkStream( stream, enableChunking: true, @@ -963,6 +1005,28 @@ Future _sendMessageInternal( } // Stream response using SSE + // Resolve Socket session for background tasks parity + final socketService = ref.read(socketServiceProvider); + final socketSessionId = socketService?.sessionId; + + // Resolve tool servers from user settings (if any) + List>? toolServers; + try { + final userSettings = await api.getUserSettings(); + final ui = userSettings['ui'] as Map?; + final rawServers = ui != null ? (ui['toolServers'] as List?) : null; + if (rawServers != null && rawServers.isNotEmpty) { + toolServers = await _resolveToolServers(rawServers, api); + } + } catch (_) {} + + // Background tasks parity with Web client (safe defaults) + final bgTasks = { + 'title_generation': true, + 'tags_generation': true, + 'follow_up_generation': true, + }; + final response = await api.sendMessage( messages: conversationMessages, model: selectedModel.id, @@ -973,6 +1037,9 @@ Future _sendMessageInternal( // handled via pre-stream client-side request above enableImageGeneration: false, modelItem: modelItem, + sessionIdOverride: socketSessionId, + toolServers: toolServers, + backgroundTasks: bgTasks, ); final stream = response.stream; @@ -990,6 +1057,70 @@ Future _sendMessageInternal( ); ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage); + // If socket is available, start listening for chat-events immediately + if (socketService != null) { + void chatHandler(Map ev) { + try { + final data = ev['data']; + if (data == null) return; + final type = data['type']; + final payload = data['data']; + if (type == 'chat:completion' && payload != null) { + if (payload is Map) { + if (payload.containsKey('choices')) { + final choices = payload['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first; + final delta = choice is Map ? choice['delta'] : null; + final content = (delta is Map) ? (delta['content']?.toString() ?? '') : ''; + if (content.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + } + } + } + if (payload.containsKey('content')) { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final prev = msgs.last.content; + if (prev == '[TYPING_INDICATOR]') { + ref + .read(chatMessagesProvider.notifier) + .replaceLastMessageContent(content); + } else if (content.startsWith(prev)) { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content.substring(prev.length)); + } else { + ref + .read(chatMessagesProvider.notifier) + .replaceLastMessageContent(content); + } + } else { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content); + } + } + } + if (payload['done'] == true) { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + socketService.offChatEvents(); + } + } + } + } catch (_) {} + } + + socketService.onChatEvents(chatHandler); + Future.delayed(const Duration(seconds: 90), () { + try { + socketService.offChatEvents(); + } catch (_) {} + }); + } + // Prepare streaming and background handling BEFORE image generation final chunkedStream = StreamChunker.chunkStream( stream, @@ -1164,9 +1295,25 @@ Future _sendMessageInternal( debugPrint( 'DEBUG: No images found in generation response (pre-stream)', ); + // Do not block streaming if no images are produced + imagesAttached = true; + if (deferUntilImagesAttached && prebuffer.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(prebuffer.toString()); + prebuffer.clear(); + } } } catch (e) { debugPrint('DEBUG: Image generation failed (pre-stream): $e'); + // Fail open: allow text streaming to continue + imagesAttached = true; + if (deferUntilImagesAttached && prebuffer.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(prebuffer.toString()); + prebuffer.clear(); + } } } @@ -1180,6 +1327,7 @@ Future _sendMessageInternal( final streamSubscription = persistentController.stream.listen( (chunk) { + var effectiveChunk = chunk; // Check for web search indicators in the stream if (webSearchEnabled && !isSearching) { // Check if this is the start of web search @@ -1205,16 +1353,18 @@ Future _sendMessageInternal( (chunk.contains('[/SEARCHING]') || chunk.contains('Search complete'))) { isSearching = false; - // Clear the search status message + // Only update metadata; keep content to avoid flicker/indicator reappearing ref .read(chatMessagesProvider.notifier) .updateLastMessageWithFunction( (message) => message.copyWith( - content: '', metadata: {'webSearchActive': false}, ), ); - return; // Don't append this chunk + // Strip markers from this chunk and continue processing + effectiveChunk = effectiveChunk + .replaceAll('[SEARCHING]', '') + .replaceAll('[/SEARCHING]', ''); } // If we buffered chunks before images attached, flush once @@ -1223,9 +1373,11 @@ Future _sendMessageInternal( return; } - // Regular content - append to message - if (!chunk.contains('[SEARCHING]') && !chunk.contains('[/SEARCHING]')) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(chunk); + // Regular content - append to message (markers removed above) + if (effectiveChunk.trim().isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(effectiveChunk); } }, @@ -1312,84 +1464,54 @@ Future _sendMessageInternal( // Update local messages with server content while preserving all messages final updatedMessages = []; + final lastLocal = currentMessages.isNotEmpty ? currentMessages.last : null; for (final localMsg in currentMessages) { final serverMsg = serverMessageMap[localMsg.id]; - if (serverMsg != null && serverMsg.content.isNotEmpty) { - // Use server content if available and non-empty - // This replaces any temporary progress indicators with real content + if (serverMsg != null && serverMsg.content.isNotEmpty && + lastLocal != null && localMsg.id == lastLocal.id && + localMsg.role == 'assistant') { + // Prefer non-disruptive merge to avoid flashing typing indicator + final oldContent = localMsg.content; + final newContent = serverMsg.content; - // Stream the server content through StreamChunker for word-by-word effect - - // Clear only the last message content in-place to avoid list reset flicker - final currentList = [...currentMessages]; - final lastIndex = currentList.lastIndexWhere( - (m) => m.id == localMsg.id, - ); - if (lastIndex != -1) { - currentList[lastIndex] = currentList[lastIndex].copyWith( - content: '', - isStreaming: true, - ); + if (oldContent.trim().isEmpty || oldContent == '[TYPING_INDICATOR]') { + // Direct replacement without toggling streaming ref .read(chatMessagesProvider.notifier) - .setMessages(currentList); - } - - // Create a stream from the server content and chunk it - final serverContentStream = Stream.fromIterable([ - serverMsg.content, - ]); - final chunkedStream = StreamChunker.chunkStream( - serverContentStream, - enableChunking: true, - minChunkSize: 5, - maxChunkLength: 3, - delayBetweenChunks: const Duration(milliseconds: 25), - ); - - // Process chunks - chunkedStream.listen( - (chunk) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(chunk); - }, - onDone: () { - // Mark streaming as complete - ref - .read(chatMessagesProvider.notifier) - .finishStreaming(); - }, - onError: (error) { - // Fall back to direct replacement - final currentMessages = ref.read(chatMessagesProvider); - if (currentMessages.isNotEmpty) { - final fallbackMessages = [...currentMessages]; - final lastIndex = fallbackMessages.length - 1; - fallbackMessages[lastIndex] = - fallbackMessages[lastIndex].copyWith( - content: serverMsg.content, - isStreaming: false, - ); - ref - .read(chatMessagesProvider.notifier) - .setMessages(fallbackMessages); - } - }, - ); - - // Don't add to updatedMessages here since we're streaming - continue; - } else { - // Handle case where streaming failed and we still have typing indicator - if (localMsg.content == '[TYPING_INDICATOR]') { - // Replace typing indicator with empty content so UI can show loading state + .replaceLastMessageContent(newContent); + ref.read(chatMessagesProvider.notifier).finishStreaming(); updatedMessages.add( - localMsg.copyWith(content: '', isStreaming: false), + localMsg.copyWith(content: newContent, isStreaming: false), + ); + } else if (newContent == oldContent) { + // Already in sync + updatedMessages.add(localMsg.copyWith(isStreaming: false)); + } else if (newContent.startsWith(oldContent)) { + // Append only the delta + final delta = newContent.substring(oldContent.length); + if (delta.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(delta); + } + ref.read(chatMessagesProvider.notifier).finishStreaming(); + updatedMessages.add( + localMsg.copyWith(content: newContent, isStreaming: false), ); } else { - // Keep local message as-is + // Fallback: replace full content without re-streaming + ref + .read(chatMessagesProvider.notifier) + .replaceLastMessageContent(newContent); + ref.read(chatMessagesProvider.notifier).finishStreaming(); + updatedMessages.add( + localMsg.copyWith(content: newContent, isStreaming: false), + ); + } + } else { + // Keep local message as-is (strip typing indicator if it slipped through) + if (localMsg.content == '[TYPING_INDICATOR]') { + updatedMessages.add(localMsg.copyWith(content: '', isStreaming: false)); + } else { updatedMessages.add(localMsg); } } @@ -2113,3 +2235,178 @@ final stopGenerationProvider = Provider((ref) { ref.read(chatMessagesProvider.notifier).finishStreaming(); }; }); + +// ========== Tool Servers (OpenAPI) Helpers ========== + +Future>> _resolveToolServers( + List rawServers, + dynamic api, +) async { + final List> resolved = []; + for (final s in rawServers) { + try { + if (s is! Map) continue; + final cfg = s['config']; + if (cfg is Map && cfg['enable'] != true) continue; + + final url = (s['url'] ?? '').toString(); + final path = (s['path'] ?? '').toString(); + if (url.isEmpty || path.isEmpty) continue; + final fullUrl = path.contains('://') + ? path + : '$url${path.startsWith('/') ? '' : '/'}$path'; + + // Fetch OpenAPI spec (supports YAML/JSON) + Map? openapi; + try { + final resp = await api.dio.get(fullUrl); + final ct = resp.headers.map['content-type']?.join(',') ?? ''; + if (fullUrl.toLowerCase().endsWith('.yaml') || + fullUrl.toLowerCase().endsWith('.yml') || + ct.contains('yaml')) { + final doc = yaml.loadYaml(resp.data); + openapi = json.decode(json.encode(doc)) as Map; + } else { + final data = resp.data; + if (data is Map) { + openapi = data; + } else if (data is String) { + openapi = json.decode(data) as Map; + } + } + } catch (_) { + continue; + } + if (openapi == null) continue; + + // Convert OpenAPI to tool specs + final specs = _convertOpenApiToToolPayload(openapi); + resolved.add({ + 'url': url, + 'openapi': openapi, + 'info': openapi['info'], + 'specs': specs, + }); + } catch (_) { + continue; + } + } + return resolved; +} + +Map? _resolveRef(String ref, Map? components) { + // e.g., #/components/schemas/MySchema + if (!ref.startsWith('#/')) return null; + final parts = ref.split('/'); + if (parts.length < 4) return null; + final type = parts[2]; // schemas + final name = parts[3]; + final section = components?[type]; + if (section is Map) { + final schema = section[name]; + if (schema is Map) return Map.from(schema); + } + return null; +} + +Map _resolveSchemaSimple( + dynamic schema, + Map? components, +) { + if (schema is Map) { + if (schema.containsKey(r'$ref')) { + final ref = schema[r'$ref'] as String; + final resolved = _resolveRef(ref, components); + if (resolved != null) return _resolveSchemaSimple(resolved, components); + } + final type = schema['type']; + final out = {}; + if (type is String) { + out['type'] = type; + if (schema['description'] != null) out['description'] = schema['description']; + if (type == 'object') { + out['properties'] = {}; + if (schema['required'] is List) out['required'] = List.from(schema['required']); + final props = schema['properties']; + if (props is Map) { + props.forEach((k, v) { + out['properties'][k] = _resolveSchemaSimple(v, components); + }); + } + } else if (type == 'array') { + out['items'] = _resolveSchemaSimple(schema['items'], components); + } + } + return out; + } + return {}; +} + +List> _convertOpenApiToToolPayload(Map openApi) { + final tools = >[]; + final paths = openApi['paths']; + if (paths is! Map) return tools; + paths.forEach((path, methods) { + if (methods is! Map) return; + methods.forEach((method, operation) { + if (operation is Map && operation['operationId'] != null) { + final tool = { + 'name': operation['operationId'], + 'description': operation['description'] ?? operation['summary'] ?? 'No description available.', + 'parameters': { + 'type': 'object', + 'properties': {}, + 'required': [], + }, + }; + // Parameters + final params = operation['parameters']; + if (params is List) { + for (final p in params) { + if (p is Map) { + final name = p['name']; + final schema = p['schema'] as Map?; + if (name != null && schema != null) { + String desc = (schema['description'] ?? p['description'] ?? '').toString(); + if (schema['enum'] is List) { + desc = '$desc. Possible values: ${(schema['enum'] as List).join(', ')}'; + } + tool['parameters']['properties'][name] = { + 'type': schema['type'], + 'description': desc, + }; + if (p['required'] == true) { + (tool['parameters']['required'] as List).add(name); + } + } + } + } + } + // requestBody + final reqBody = operation['requestBody']; + if (reqBody is Map) { + final content = reqBody['content']; + if (content is Map && content['application/json'] is Map) { + final schema = content['application/json']['schema']; + final resolved = _resolveSchemaSimple(schema, openApi['components'] as Map?); + if (resolved['properties'] is Map) { + tool['parameters']['properties'] = { + ...tool['parameters']['properties'], + ...resolved['properties'] as Map, + }; + if (resolved['required'] is List) { + final req = Set.from(tool['parameters']['required'] as List) + ..addAll(resolved['required'] as List); + tool['parameters']['required'] = req.toList(); + } + } else if (resolved['type'] == 'array') { + tool['parameters'] = resolved; + } + } + } + tools.add(tool); + } + }); + }); + return tools; +} diff --git a/lib/features/chat/widgets/assistant_message_widget.dart b/lib/features/chat/widgets/assistant_message_widget.dart index e18f198..f98a268 100644 --- a/lib/features/chat/widgets/assistant_message_widget.dart +++ b/lib/features/chat/widgets/assistant_message_widget.dart @@ -2,11 +2,12 @@ import 'package:flutter/material.dart'; import 'package:flutter/cupertino.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:flutter_animate/flutter_animate.dart'; -import 'dart:async'; +import 'dart:convert'; import 'dart:io' show Platform; import '../../../shared/theme/theme_extensions.dart'; import '../../../shared/widgets/markdown/streaming_markdown_widget.dart'; import '../../../core/utils/reasoning_parser.dart'; +import '../../../core/utils/tool_calls_parser.dart'; import 'enhanced_image_attachment.dart'; import 'package:conduit/l10n/app_localizations.dart'; import 'enhanced_attachment.dart'; @@ -42,15 +43,14 @@ class _AssistantMessageWidgetState extends ConsumerState late AnimationController _fadeController; late AnimationController _slideController; ReasoningContent? _reasoningContent; - String _renderedContent = ''; - Timer? _throttleTimer; - String? _pendingContent; + List _toolSegments = const []; + final Set _expandedToolIds = {}; Widget? _cachedAvatar; + String _contentSansDetails = ''; @override void initState() { super.initState(); - _renderedContent = widget.message.content ?? ''; _fadeController = AnimationController( duration: const Duration(milliseconds: 200), vsync: this, @@ -60,8 +60,8 @@ class _AssistantMessageWidgetState extends ConsumerState vsync: this, ); - // Parse reasoning content if present - _updateReasoningContent(); + // Parse reasoning and tool-calls sections + _reparseSections(); } @override @@ -75,11 +75,9 @@ class _AssistantMessageWidgetState extends ConsumerState void didUpdateWidget(AssistantMessageWidget oldWidget) { super.didUpdateWidget(oldWidget); - // Re-parse reasoning content when message content changes + // Re-parse sections when message content changes if (oldWidget.message.content != widget.message.content) { - // Throttle markdown re-rendering for smoother streaming - _scheduleRenderUpdate(widget.message.content ?? ''); - _updateReasoningContent(); + _reparseSections(); } // Rebuild cached avatar if model name changes @@ -88,49 +86,209 @@ class _AssistantMessageWidgetState extends ConsumerState } } - void _updateReasoningContent() { - if (widget.message.content != null) { - final newReasoningContent = ReasoningParser.parseReasoningContent( - widget.message.content!, - ); - if (newReasoningContent != _reasoningContent) { - setState(() { - _reasoningContent = newReasoningContent; - }); - } + void _reparseSections() { + final raw0 = widget.message.content ?? ''; + // Strip any leftover placeholders from content before parsing + const ti = '[TYPING_INDICATOR]'; + const searchBanner = '🔍 Searching the web...'; + String raw = raw0; + if (raw.startsWith(ti)) { + raw = raw.substring(ti.length); } - } + if (raw.startsWith(searchBanner)) { + raw = raw.substring(searchBanner.length); + } + final rc = ReasoningParser.parseReasoningContent(raw); + String base = rc?.mainContent ?? raw; - void _scheduleRenderUpdate(String rawContent) { - final safe = _safeForStreaming(rawContent); - if (_throttleTimer != null && _throttleTimer!.isActive) { - _pendingContent = safe; - return; - } - if (mounted) { - setState(() => _renderedContent = safe); - } else { - _renderedContent = safe; - } - _throttleTimer = Timer(const Duration(milliseconds: 80), () { - if (!mounted) return; - if (_pendingContent != null) { - setState(() { - _renderedContent = _pendingContent!; - _pendingContent = null; - }); - } + final tools = ToolCallsParser.parse(base); + final segments = ToolCallsParser.segments(base); + + setState(() { + _reasoningContent = rc; + _contentSansDetails = tools?.mainContent ?? base; + _toolSegments = segments ?? [ToolCallsSegment.text(_contentSansDetails)]; }); } - String _safeForStreaming(String content) { - if (content.isEmpty) return content; - // Auto-close an unbalanced triple backtick fence during streaming so markdown stays valid - final fenceCount = '```'.allMatches(content).length; - if (fenceCount.isOdd) { - return '$content\n```'; + // No streaming-specific markdown fixes needed here; handled by Markdown widget + + Widget _buildToolCallTile(ToolCallEntry tc) { + final isExpanded = _expandedToolIds.contains(tc.id); + final theme = context.conduitTheme; + + String _pretty(dynamic v, {int max = 1200}) { + try { + final pretty = const JsonEncoder.withIndent(' ').convert(v); + return pretty.length > max ? pretty.substring(0, max) + '\n…' : pretty; + } catch (_) { + final s = v?.toString() ?? ''; + return s.length > max ? s.substring(0, max) + '…' : s; + } } - return content; + + return Padding( + padding: const EdgeInsets.only(bottom: Spacing.xs), + child: InkWell( + onTap: () { + setState(() { + if (isExpanded) { + _expandedToolIds.remove(tc.id); + } else { + _expandedToolIds.add(tc.id); + } + }); + }, + borderRadius: BorderRadius.circular(AppBorderRadius.md), + child: Container( + width: double.infinity, + padding: const EdgeInsets.symmetric( + horizontal: Spacing.sm, + vertical: Spacing.xs, + ), + decoration: BoxDecoration( + color: theme.surfaceContainer.withValues(alpha: 0.5), + borderRadius: BorderRadius.circular(AppBorderRadius.md), + border: Border.all( + color: theme.dividerColor, + width: BorderWidth.thin, + ), + ), + child: Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + Row( + mainAxisSize: MainAxisSize.min, + children: [ + Icon( + isExpanded + ? Icons.expand_less_rounded + : Icons.expand_more_rounded, + size: 16, + color: theme.textSecondary, + ), + const SizedBox(width: Spacing.xs), + Icon( + tc.done ? Icons.build_circle_outlined : Icons.play_circle_outline, + size: 14, + color: theme.buttonPrimary, + ), + const SizedBox(width: Spacing.xs), + Flexible( + child: Text( + tc.done + ? 'Tool Executed: ${tc.name}' + : 'Running tool: ${tc.name}…', + overflow: TextOverflow.ellipsis, + style: TextStyle( + fontSize: AppTypography.bodySmall, + color: theme.textSecondary, + fontWeight: FontWeight.w500, + ), + ), + ), + ], + ), + + AnimatedCrossFade( + firstChild: const SizedBox.shrink(), + secondChild: Container( + margin: const EdgeInsets.only(top: Spacing.sm), + padding: const EdgeInsets.all(Spacing.sm), + decoration: BoxDecoration( + color: theme.surfaceContainer.withValues(alpha: 0.3), + borderRadius: BorderRadius.circular(AppBorderRadius.md), + border: Border.all( + color: theme.dividerColor, + width: BorderWidth.thin, + ), + ), + child: Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + if (tc.arguments != null) ...[ + Text( + 'Arguments', + style: TextStyle( + fontSize: AppTypography.bodySmall, + color: theme.textSecondary, + fontWeight: FontWeight.w600, + ), + ), + const SizedBox(height: Spacing.xxs), + SelectableText( + _pretty(tc.arguments), + style: TextStyle( + fontSize: AppTypography.bodySmall, + color: theme.textSecondary, + fontFamily: 'monospace', + height: 1.35, + ), + ), + const SizedBox(height: Spacing.sm), + ], + + if (tc.result != null) ...[ + Text( + 'Result', + style: TextStyle( + fontSize: AppTypography.bodySmall, + color: theme.textSecondary, + fontWeight: FontWeight.w600, + ), + ), + const SizedBox(height: Spacing.xxs), + SelectableText( + _pretty(tc.result), + style: TextStyle( + fontSize: AppTypography.bodySmall, + color: theme.textSecondary, + fontFamily: 'monospace', + height: 1.35, + ), + ), + ], + ], + ), + ), + crossFadeState: + isExpanded ? CrossFadeState.showSecond : CrossFadeState.showFirst, + duration: const Duration(milliseconds: 200), + ), + ], + ), + ), + ), + ); + } + + Widget _buildSegmentedContent() { + final children = []; + for (final seg in _toolSegments) { + if (seg.isToolCall && seg.entry != null) { + children.add(_buildToolCallTile(seg.entry!)); + } else if ((seg.text ?? '').trim().isNotEmpty) { + children.add( + _buildEnhancedMarkdownContent(seg.text!), + ); + } + } + + if (children.isEmpty) return const SizedBox.shrink(); + return Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: children, + ); + } + + bool get _hasRenderableSegments { + for (final seg in _toolSegments) { + if ((seg.isToolCall && seg.entry != null) || + ((seg.text ?? '').trim().isNotEmpty)) { + return true; + } + } + return false; } void _buildCachedAvatar() { @@ -170,7 +328,6 @@ class _AssistantMessageWidgetState extends ConsumerState void dispose() { _fadeController.dispose(); _slideController.dispose(); - _throttleTimer?.cancel(); super.dispose(); } @@ -312,23 +469,18 @@ class _AssistantMessageWidgetState extends ConsumerState const SizedBox(height: Spacing.md), ], - if (widget.isStreaming && + // Tool calls are rendered inline via segmented content + + // If there are any renderable segments (tool calls or text), + // render them even during streaming to avoid showing the + // typing indicator underneath. + if (!_hasRenderableSegments && + widget.isStreaming && (widget.message.content.trim().isEmpty || widget.message.content == '[TYPING_INDICATOR]')) _buildTypingIndicator() - else if (widget.isStreaming && - widget.message.content.isNotEmpty && - widget.message.content != '[TYPING_INDICATOR]') - // While streaming, render only main content (strip reasoning details to avoid flashing tags) - _buildEnhancedMarkdownContent( - _reasoningContent?.mainContent ?? _renderedContent, - ) else - // After streaming finishes (or static content), render full markdown - _buildEnhancedMarkdownContent( - _reasoningContent?.mainContent ?? - widget.message.content, - ), + _buildSegmentedContent(), ], ), ), @@ -356,8 +508,9 @@ class _AssistantMessageWidgetState extends ConsumerState return const SizedBox.shrink(); } - // Process content to ensure proper image rendering - final processedContent = _processContentForImages(content); + // Sanitize tool-call
blocks and process images + final toolSanitized = ToolCallsParser.summarize(content); + final processedContent = _processContentForImages(toolSanitized); return StreamingMarkdownWidget( staticContent: processedContent, diff --git a/pubspec.lock b/pubspec.lock index a8cfd6c..f12db3e 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -1098,6 +1098,22 @@ packages: description: flutter source: sdk version: "0.0.0" + socket_io_client: + dependency: "direct main" + description: + name: socket_io_client + sha256: ede469f3e4c55e8528b4e023bdedbc20832e8811ab9b61679d1ba3ed5f01f23b + url: "https://pub.dev" + source: hosted + version: "2.0.3+1" + socket_io_common: + dependency: transitive + description: + name: socket_io_common + sha256: "2ab92f8ff3ebbd4b353bf4a98bee45cc157e3255464b2f90f66e09c4472047eb" + url: "https://pub.dev" + source: hosted + version: "2.0.3" source_gen: dependency: transitive description: @@ -1451,7 +1467,7 @@ packages: source: hosted version: "6.5.0" yaml: - dependency: transitive + dependency: "direct main" description: name: yaml sha256: b9da305ac7c39faa3f030eccd175340f968459dae4af175130b3fc47e40d76ce diff --git a/pubspec.yaml b/pubspec.yaml index 5ae43a5..fb6ef4c 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -28,6 +28,8 @@ dependencies: markdown_widget: ^2.3.2+8 flutter_highlight: ^0.7.0 cached_network_image: ^3.3.1 + socket_io_client: ^2.0.3 + yaml: ^3.1.2